Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/sse/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'

var express = require('../../');
var path = require('node:path');

var app = module.exports = express();

// serve the client page
app.get('/', function (req, res) {
res.sendFile(path.join(__dirname, 'public', 'index.html'));
});

/* -server-sent events endpoint
-send unnamend event
-The event sends ticks to the client every second.
-stops sending them when the client disconnects
*/
app.get('/events', function (req, res) {
var sse = res.sse();

sse.send({ message: 'connected', at: new Date().toISOString() });

var n = 0;
var timer = setInterval(function () {
sse.send({ now: new Date().toISOString() }, { event: 'tick', id: ++n });
}, 1000);

req.on('close', function () {
clearInterval(timer);
sse.close();
});
});


if (!module.parent) {
app.listen(3000);
console.log('Express sse-server started on port 3000');
}


38 changes: 38 additions & 0 deletions examples/sse/public/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Express SSE example</title>
<style>
body { font: 14px/1.5 system-ui, sans-serif; margin: 2rem; }
#log { list-style: none; padding: 0; }
#log li { padding: 2px 0; border-bottom: 1px solid #eee; }
.name { color: #888; }
</style>
</head>
<body>
<h1>Server-Sent Events</h1>
<p>Streaming live from <code>/events</code> via the standard <code>EventSource</code> API.</p>
<ul id="log"></ul>

<script>
var log = document.getElementById('log')

function add(name, data) {
var li = document.createElement('li')
li.innerHTML = '<span class="name">' + name + '</span> ' + data
log.prepend(li)
}

var es = new EventSource('/events')

// unnamed events arrive on `onmessage`
es.onmessage = function (e) { add('message', e.data) }

// named events are listened to explicitly
es.addEventListener('tick', function (e) { add('tick #' + e.lastEventId, e.data) })

es.onerror = function () { add('error', 'connection lost, retrying…') }
</script>
</body>
</html>
28 changes: 27 additions & 1 deletion lib/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var normalizeTypes = require('./utils').normalizeTypes;
var setCharset = require('./utils').setCharset;
var cookie = require('cookie');
var send = require('send');
var SSE = require('./sse');
var extname = path.extname;
var resolve = path.resolve;
var basename = path.basename;
Expand Down Expand Up @@ -247,6 +248,31 @@ res.json = function json(obj) {
return this.send(body);
};

/**
* Open a Server-Sent Events stream on this response.
*
* Writes the `text/event-stream` headers, starts a keep-alive heartbeat and
* returns an `SSE` instance to push events with:
*
* app.get('/events', function (req, res) {
* var sse = res.sse();
* sse.send({ hello: 'world' });
* sse.send('tick', { event: 'ping', id: 1 });
* });
*
* Options:
*
* - `heartbeat` keep-alive interval in ms (default `15000`, `false` to disable)
*
* @param {object} [options]
* @return {SSE}
* @public
*/

res.sse = function sse(options) {
return new SSE(this, options);
};

/**
* Send JSON response with JSONP callback support.
*
Expand Down Expand Up @@ -847,7 +873,7 @@ res.redirect = function redirect(url) {
html: function(){
var u = escapeHtml(address);
body = '<!DOCTYPE html><head><title>' + statuses.message[status] + '</title></head>'
+ '<body><p>' + statuses.message[status] + '. Redirecting to ' + u + '</p></body>'
+ '<body><p>' + statuses.message[status] + '. Redirecting to ' + u + '</p></body>'
},

default: function(){
Expand Down
254 changes: 254 additions & 0 deletions lib/sse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
'use strict';

/**
* Module dependencies.
* @private
*/

var debug = require('debug')('express:sse');
var onFinished = require('on-finished');
var Readable = require('node:stream').Readable;
const { Buffer } = require('node:buffer');

/**
* Module variables.
* @private
*/

var DEFAULT_HEARTBEAT = 15000;

/**
* Module exports.
* @public
*/

module.exports = SSE;

/**
* Initialize a new Server-Sent Events stream on the given response.
*
* Opens the stream by writing the `text/event-stream` headers and starts a
* heartbeat that keeps the connection (and any intermediary proxies) alive.
* The heartbeat is cleared automatically once the connection ends.
*
* Options:
*
* - `heartbeat` interval in ms for keep-alive comments (default `15000`,
* `false` to disable)
*
* @param {http.ServerResponse} res
* @param {object} [options]
* @public
*/

function SSE(res, options) {
var opts = options || {};
var self = this;

this.res = res;
this.finished = false;
this._heartbeat = null;

debug('open stream');

// open the event stream
res.statusCode = 200;
res.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});

// flush the headers so the client establishes the connection immediately
res.flushHeaders();

// keep the connection alive with periodic comments
var heartbeat = opts.heartbeat === undefined ? DEFAULT_HEARTBEAT : opts.heartbeat;

if (heartbeat) {
this._heartbeat = setInterval(function onHeartbeat() {
self.comment('');
}, heartbeat);

// do not let the heartbeat keep the process alive on its own
this._heartbeat.unref();
}

// stop the heartbeat once the underlying connection is gone
onFinished(res, function onClose() {
debug('close stream');
self.finished = true;
self._clearHeartbeat();
});
}

/**
* Send an event with the given `data`.
*
* `data` is serialized the same way the rest of the framework serializes a
* response body: strings are sent as-is, `Buffer`/`TypedArray` are decoded as
* UTF-8 text, objects are `JSON.stringify`'d and everything else is coerced
* with `String()`. Multi-line payloads are split into multiple `data:` lines
* as required by the protocol.
*
* Options:
*
* - `event` the event name (`event:` field)
* - `id` the event id (`id:` field)
* - `retry` the client reconnection time in ms (`retry:` field)
*
* @param {*} data
* @param {object} [options]
* @return {SSE} for chaining
* @public
*/

SSE.prototype.send = function send(data, options) {
var opts = options || {};
var frame = '';

if (opts.id != null) frame += 'id: ' + opts.id + '\n';
if (opts.event != null) frame += 'event: ' + opts.event + '\n';
if (opts.retry != null) frame += 'retry: ' + opts.retry + '\n';

// a payload may span multiple lines, each needs its own `data:` field
var lines = format(data).split('\n');
for (var i = 0; i < lines.length; i++) {
frame += 'data: ' + lines[i] + '\n';
}

return this._write(frame + '\n');
};

/**
* Stream a source, emitting one event per chunk, and resolve when it ends.
*
* Any stream-like source is accepted by bridging it into a Node `Readable`:
* a web `ReadableStream` or `Blob` via `Readable.fromWeb()`, and Node streams
* or async iterables directly. Each chunk is passed through `send()`, so the
* same serialization and `options` apply.
*
* @param {ReadableStream|Readable|Blob|AsyncIterable|Iterable} source
* @param {object} [options]
* @return {Promise<SSE>} resolves with the stream when the source ends
* @public
*/

SSE.prototype.stream = async function stream(source, options) {
var readable = toReadable(source);

for await (var chunk of readable) {
if (this.finished) break;
this.send(chunk, options);
}

return this;
};

/**
* Write a comment line, used as a keep-alive that does not trigger `onmessage`.
*
* @param {string} [text]
* @return {SSE} for chaining
* @public
*/

SSE.prototype.comment = function comment(text) {
return this._write(': ' + (text == null ? '' : text) + '\n\n');
};

/**
* Close the stream and stop the heartbeat.
*
* @return {SSE} for chaining
* @public
*/

SSE.prototype.close = function close() {
this._clearHeartbeat();

if (!this.finished) {
this.finished = true;
this.res.end();
}

return this;
};

/**
* Write a raw chunk if the connection is still open.
*
* @param {string} chunk
* @return {SSE} for chaining
* @private
*/

SSE.prototype._write = function _write(chunk) {
if (!this.finished) {
this.res.write(chunk);
}

return this;
};

/**
* Clear the heartbeat interval if one is running.
*
* @private
*/

SSE.prototype._clearHeartbeat = function _clearHeartbeat() {
if (this._heartbeat) {
clearInterval(this._heartbeat);
this._heartbeat = null;
}
};

/**
* Serialize a value to the text payload of a `data:` field.
*
* @param {*} data
* @return {string}
* @private
*/

function format(data) {
if (data == null) {
return '';
}

if (ArrayBuffer.isView(data)) {
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString('utf8');
}

if (typeof data === 'object') {
return JSON.stringify(data);
}

return String(data);
}

/**
* Bridge any stream-like source into a Node `Readable`.
*
* @param {ReadableStream|Readable|Blob|AsyncIterable|Iterable} source
* @return {Readable}
* @private
*/

function toReadable(source) {
if (source instanceof Readable) {
return source;
}

if (typeof ReadableStream !== 'undefined' && source instanceof ReadableStream) {
return Readable.fromWeb(source);
}

if (typeof Blob !== 'undefined' && source instanceof Blob) {
return Readable.fromWeb(source.stream());
}

return Readable.from(source);
}
Loading
Loading