diff --git a/examples/sse/index.js b/examples/sse/index.js
new file mode 100644
index 00000000000..dfe6426d454
--- /dev/null
+++ b/examples/sse/index.js
@@ -0,0 +1,40 @@
+'use strict'
+
+var express = require('../../');
+var path = require('node:path');
+
+var app = module.exports = express();
+
+// serve the client page
+app.get('/', function (req, res) {
+ res.sendFile(path.join(__dirname, 'public', 'index.html'));
+});
+
+/* -server-sent events endpoint
+ -send unnamend event
+ -The event sends ticks to the client every second.
+ -stops sending them when the client disconnects
+*/
+app.get('/events', function (req, res) {
+ var sse = res.sse();
+
+ sse.send({ message: 'connected', at: new Date().toISOString() });
+
+ var n = 0;
+ var timer = setInterval(function () {
+ sse.send({ now: new Date().toISOString() }, { event: 'tick', id: ++n });
+ }, 1000);
+
+ req.on('close', function () {
+ clearInterval(timer);
+ sse.close();
+ });
+});
+
+
+if (!module.parent) {
+ app.listen(3000);
+ console.log('Express sse-server started on port 3000');
+}
+
+
diff --git a/examples/sse/public/index.html b/examples/sse/public/index.html
new file mode 100644
index 00000000000..430a2df1494
--- /dev/null
+++ b/examples/sse/public/index.html
@@ -0,0 +1,38 @@
+
+
+
+
+ Express SSE example
+
+
+
+ Server-Sent Events
+ Streaming live from /events via the standard EventSource API.
+
+
+
+
+
diff --git a/lib/response.js b/lib/response.js
index b4755a5c060..9b062921dfb 100644
--- a/lib/response.js
+++ b/lib/response.js
@@ -29,6 +29,7 @@ var normalizeTypes = require('./utils').normalizeTypes;
var setCharset = require('./utils').setCharset;
var cookie = require('cookie');
var send = require('send');
+var SSE = require('./sse');
var extname = path.extname;
var resolve = path.resolve;
var basename = path.basename;
@@ -247,6 +248,31 @@ res.json = function json(obj) {
return this.send(body);
};
+/**
+ * Open a Server-Sent Events stream on this response.
+ *
+ * Writes the `text/event-stream` headers, starts a keep-alive heartbeat and
+ * returns an `SSE` instance to push events with:
+ *
+ * app.get('/events', function (req, res) {
+ * var sse = res.sse();
+ * sse.send({ hello: 'world' });
+ * sse.send('tick', { event: 'ping', id: 1 });
+ * });
+ *
+ * Options:
+ *
+ * - `heartbeat` keep-alive interval in ms (default `15000`, `false` to disable)
+ *
+ * @param {object} [options]
+ * @return {SSE}
+ * @public
+ */
+
+res.sse = function sse(options) {
+ return new SSE(this, options);
+};
+
/**
* Send JSON response with JSONP callback support.
*
@@ -847,7 +873,7 @@ res.redirect = function redirect(url) {
html: function(){
var u = escapeHtml(address);
body = '' + statuses.message[status] + ''
- + '' + statuses.message[status] + '. Redirecting to ' + u + '
'
+ + '' + statuses.message[status] + '. Redirecting to ' + u + '
'
},
default: function(){
diff --git a/lib/sse.js b/lib/sse.js
new file mode 100644
index 00000000000..a901dc39b15
--- /dev/null
+++ b/lib/sse.js
@@ -0,0 +1,254 @@
+'use strict';
+
+/**
+ * Module dependencies.
+ * @private
+ */
+
+var debug = require('debug')('express:sse');
+var onFinished = require('on-finished');
+var Readable = require('node:stream').Readable;
+const { Buffer } = require('node:buffer');
+
+/**
+ * Module variables.
+ * @private
+ */
+
+var DEFAULT_HEARTBEAT = 15000;
+
+/**
+ * Module exports.
+ * @public
+ */
+
+module.exports = SSE;
+
+/**
+ * Initialize a new Server-Sent Events stream on the given response.
+ *
+ * Opens the stream by writing the `text/event-stream` headers and starts a
+ * heartbeat that keeps the connection (and any intermediary proxies) alive.
+ * The heartbeat is cleared automatically once the connection ends.
+ *
+ * Options:
+ *
+ * - `heartbeat` interval in ms for keep-alive comments (default `15000`,
+ * `false` to disable)
+ *
+ * @param {http.ServerResponse} res
+ * @param {object} [options]
+ * @public
+ */
+
+function SSE(res, options) {
+ var opts = options || {};
+ var self = this;
+
+ this.res = res;
+ this.finished = false;
+ this._heartbeat = null;
+
+ debug('open stream');
+
+ // open the event stream
+ res.statusCode = 200;
+ res.set({
+ 'Content-Type': 'text/event-stream',
+ 'Cache-Control': 'no-cache, no-transform',
+ 'Connection': 'keep-alive',
+ 'X-Accel-Buffering': 'no'
+ });
+
+ // flush the headers so the client establishes the connection immediately
+ res.flushHeaders();
+
+ // keep the connection alive with periodic comments
+ var heartbeat = opts.heartbeat === undefined ? DEFAULT_HEARTBEAT : opts.heartbeat;
+
+ if (heartbeat) {
+ this._heartbeat = setInterval(function onHeartbeat() {
+ self.comment('');
+ }, heartbeat);
+
+ // do not let the heartbeat keep the process alive on its own
+ this._heartbeat.unref();
+ }
+
+ // stop the heartbeat once the underlying connection is gone
+ onFinished(res, function onClose() {
+ debug('close stream');
+ self.finished = true;
+ self._clearHeartbeat();
+ });
+}
+
+/**
+ * Send an event with the given `data`.
+ *
+ * `data` is serialized the same way the rest of the framework serializes a
+ * response body: strings are sent as-is, `Buffer`/`TypedArray` are decoded as
+ * UTF-8 text, objects are `JSON.stringify`'d and everything else is coerced
+ * with `String()`. Multi-line payloads are split into multiple `data:` lines
+ * as required by the protocol.
+ *
+ * Options:
+ *
+ * - `event` the event name (`event:` field)
+ * - `id` the event id (`id:` field)
+ * - `retry` the client reconnection time in ms (`retry:` field)
+ *
+ * @param {*} data
+ * @param {object} [options]
+ * @return {SSE} for chaining
+ * @public
+ */
+
+SSE.prototype.send = function send(data, options) {
+ var opts = options || {};
+ var frame = '';
+
+ if (opts.id != null) frame += 'id: ' + opts.id + '\n';
+ if (opts.event != null) frame += 'event: ' + opts.event + '\n';
+ if (opts.retry != null) frame += 'retry: ' + opts.retry + '\n';
+
+ // a payload may span multiple lines, each needs its own `data:` field
+ var lines = format(data).split('\n');
+ for (var i = 0; i < lines.length; i++) {
+ frame += 'data: ' + lines[i] + '\n';
+ }
+
+ return this._write(frame + '\n');
+};
+
+/**
+ * Stream a source, emitting one event per chunk, and resolve when it ends.
+ *
+ * Any stream-like source is accepted by bridging it into a Node `Readable`:
+ * a web `ReadableStream` or `Blob` via `Readable.fromWeb()`, and Node streams
+ * or async iterables directly. Each chunk is passed through `send()`, so the
+ * same serialization and `options` apply.
+ *
+ * @param {ReadableStream|Readable|Blob|AsyncIterable|Iterable} source
+ * @param {object} [options]
+ * @return {Promise} resolves with the stream when the source ends
+ * @public
+ */
+
+SSE.prototype.stream = async function stream(source, options) {
+ var readable = toReadable(source);
+
+ for await (var chunk of readable) {
+ if (this.finished) break;
+ this.send(chunk, options);
+ }
+
+ return this;
+};
+
+/**
+ * Write a comment line, used as a keep-alive that does not trigger `onmessage`.
+ *
+ * @param {string} [text]
+ * @return {SSE} for chaining
+ * @public
+ */
+
+SSE.prototype.comment = function comment(text) {
+ return this._write(': ' + (text == null ? '' : text) + '\n\n');
+};
+
+/**
+ * Close the stream and stop the heartbeat.
+ *
+ * @return {SSE} for chaining
+ * @public
+ */
+
+SSE.prototype.close = function close() {
+ this._clearHeartbeat();
+
+ if (!this.finished) {
+ this.finished = true;
+ this.res.end();
+ }
+
+ return this;
+};
+
+/**
+ * Write a raw chunk if the connection is still open.
+ *
+ * @param {string} chunk
+ * @return {SSE} for chaining
+ * @private
+ */
+
+SSE.prototype._write = function _write(chunk) {
+ if (!this.finished) {
+ this.res.write(chunk);
+ }
+
+ return this;
+};
+
+/**
+ * Clear the heartbeat interval if one is running.
+ *
+ * @private
+ */
+
+SSE.prototype._clearHeartbeat = function _clearHeartbeat() {
+ if (this._heartbeat) {
+ clearInterval(this._heartbeat);
+ this._heartbeat = null;
+ }
+};
+
+/**
+ * Serialize a value to the text payload of a `data:` field.
+ *
+ * @param {*} data
+ * @return {string}
+ * @private
+ */
+
+function format(data) {
+ if (data == null) {
+ return '';
+ }
+
+ if (ArrayBuffer.isView(data)) {
+ return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString('utf8');
+ }
+
+ if (typeof data === 'object') {
+ return JSON.stringify(data);
+ }
+
+ return String(data);
+}
+
+/**
+ * Bridge any stream-like source into a Node `Readable`.
+ *
+ * @param {ReadableStream|Readable|Blob|AsyncIterable|Iterable} source
+ * @return {Readable}
+ * @private
+ */
+
+function toReadable(source) {
+ if (source instanceof Readable) {
+ return source;
+ }
+
+ if (typeof ReadableStream !== 'undefined' && source instanceof ReadableStream) {
+ return Readable.fromWeb(source);
+ }
+
+ if (typeof Blob !== 'undefined' && source instanceof Blob) {
+ return Readable.fromWeb(source.stream());
+ }
+
+ return Readable.from(source);
+}
diff --git a/test/res.sse.js b/test/res.sse.js
new file mode 100644
index 00000000000..8af30586195
--- /dev/null
+++ b/test/res.sse.js
@@ -0,0 +1,229 @@
+'use strict'
+
+var express = require('../')
+ , request = require('supertest')
+ , assert = require('node:assert')
+ , Readable = require('node:stream').Readable;
+
+const { Buffer } = require('node:buffer');
+
+describe('res', function () {
+ describe('.sse()', function () {
+ it('should set the event-stream headers', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ res.sse({ heartbeat: false }).close();
+ });
+
+ request(app)
+ .get('/')
+ .expect('Content-Type', 'text/event-stream; charset=utf-8')
+ .expect('Cache-Control', 'no-cache, no-transform')
+ .expect('Connection', 'keep-alive')
+ .expect(200, done);
+ })
+
+ it('should send a string as a single data field', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.send('hello');
+ sse.close();
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: hello\n\n', done);
+ })
+
+ it('should serialize objects as JSON', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.send({ hello: 'world' });
+ sse.close();
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: {"hello":"world"}\n\n', done);
+ })
+
+ it('should decode Buffers as utf-8 text', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.send(Buffer.from('café'));
+ sse.close();
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: café\n\n', done);
+ })
+
+ it('should write id, event and retry fields', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.send('payload', { event: 'update', id: 42, retry: 3000 });
+ sse.close();
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'id: 42\nevent: update\nretry: 3000\ndata: payload\n\n', done);
+ })
+
+ it('should split multi-line data into multiple data fields', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.send('line one\nline two');
+ sse.close();
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: line one\ndata: line two\n\n', done);
+ })
+
+ it('should write a comment line', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.comment('keep-alive');
+ sse.close();
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, ': keep-alive\n\n', done);
+ })
+
+ it('should be chainable', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ res.sse({ heartbeat: false })
+ .send('a')
+ .send('b')
+ .close();
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: a\n\ndata: b\n\n', done);
+ })
+
+ describe('.stream()', function () {
+ it('should emit one event per chunk of a Node Readable', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.stream(Readable.from(['one', 'two'])).then(function () {
+ sse.close();
+ });
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: one\n\ndata: two\n\n', done);
+ })
+
+ it('should emit one event per chunk of a web ReadableStream', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ var source = new ReadableStream({
+ start: function (controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.close();
+ }
+ });
+ sse.stream(source, { event: 'chunk' }).then(function () {
+ sse.close();
+ });
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'event: chunk\ndata: a\n\nevent: chunk\ndata: b\n\n', done);
+ })
+
+ it('should resolve with the stream instance', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.stream(Readable.from(['x'])).then(function (returned) {
+ assert.strictEqual(returned, sse);
+ sse.close();
+ });
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: x\n\n', done);
+ })
+
+ it('should emit one event per chunk of an async iterable', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ async function * gen () {
+ yield 'a';
+ yield 'b';
+ }
+ sse.stream(gen()).then(function () {
+ sse.close();
+ });
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: a\n\ndata: b\n\n', done);
+ })
+
+ it('should stream the contents of a Blob', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.stream(new Blob(['hello world'])).then(function () {
+ sse.close();
+ });
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, 'data: hello world\n\n', done);
+ })
+ })
+
+ it('should not throw when writing after close', function (done) {
+ var app = express();
+
+ app.get('/', function (req, res) {
+ var sse = res.sse({ heartbeat: false });
+ sse.close();
+ sse.send('ignored');
+ });
+
+ request(app)
+ .get('/')
+ .expect(200, '', done);
+ })
+ })
+})