From 90033867f695f25b032a014cfcf367ea685cf5a6 Mon Sep 17 00:00:00 2001 From: Burton Wevers Date: Fri, 15 Jun 2018 17:54:30 +0200 Subject: [PATCH 1/3] Added time lapse checking for pings added debug output for testing purposes --- lib/parser.js | 17 ++++++++++++++++- lib/twitter.js | 17 ++++++++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/lib/parser.js b/lib/parser.js index 7a4c9184..d4aaa90d 100644 --- a/lib/parser.js +++ b/lib/parser.js @@ -27,6 +27,10 @@ Parser.prototype = Object.create(EventEmitter.prototype); Parser.END = '\r\n'; Parser.END_LENGTH = 2; +var start = -1; +const NS_PER_SEC = 1e9; +const PING_PERIOD_THRESHOLD = 60; + Parser.prototype.receive = function receive(buffer) { this.buffer += buffer.toString('utf8'); var index, json; @@ -36,6 +40,7 @@ Parser.prototype.receive = function receive(buffer) { json = this.buffer.slice(0, index); this.buffer = this.buffer.slice(index + Parser.END_LENGTH); if (json.length > 0) { + console.log("[TWITTER STREAM OUTPUT]: ", json); try { json = JSON.parse(json); // Event message @@ -64,8 +69,18 @@ Parser.prototype.receive = function receive(buffer) { } } else { + if (start !== -1) { + var end = process.hrtime(start); + var secondsElapsed = (end[0] * NS_PER_SEC + end[1]) / NS_PER_SEC; + console.log('PING took ' + secondsElapsed + ' seconds'); + if (secondsElapsed > PING_PERIOD_THRESHOLD) + this.emit('disconnect') + } + console.log("[SENDING PING?] ", (new Date()).toISOString()); + start = process.hrtime(); + // Keep Alive this.emit('ping'); } } -}; +}; \ No newline at end of file diff --git a/lib/twitter.js b/lib/twitter.js index 1dbf3e27..eb2d88ca 100644 --- a/lib/twitter.js +++ b/lib/twitter.js @@ -274,27 +274,38 @@ Twitter.prototype.stream = function(method, params, callback) { request.on('response', function(response) { if(response.statusCode !== 200) { + console.log("Status Code: " + response.statusCode); stream.emit('error', new Error('Status Code: ' + response.statusCode)); } else { + console.log("[RESPONSE]: " + response.statusCode); stream.emit('response', response); } response.on('data', function(chunk) { + if (typeof chunk === "string" || chunk.length > 2) + console.log("[DATA "+ chunk.length +"]: " + chunk); stream.receive(chunk); }); response.on('error', function(error) { - stream.emit('error', error); + console.log("[ERROR]: " + error); + stream.emit('error', error); }); + response.on('disconnect', function (){ + stream.emit('disconnect', response); + }) + response.on('end', function() { - stream.emit('end', response); + console.log("[END]: " + response.statusCode); + stream.emit('end', response); }); }); request.on('error', function(error) { - stream.emit('error', error); + console.log("[ERROR]: " + error); + stream.emit('error', error); }); request.end(); From 5ba5dcd5c84b72770fc5aaf1f681e0f1602520c9 Mon Sep 17 00:00:00 2001 From: Burton Wevers Date: Fri, 15 Jun 2018 18:01:28 +0200 Subject: [PATCH 2/3] Added debug package to the project Move console.log outputs to debug library --- lib/parser.js | 7 ++++--- lib/twitter.js | 13 +++++++------ package.json | 3 ++- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/lib/parser.js b/lib/parser.js index d4aaa90d..56fc34e6 100644 --- a/lib/parser.js +++ b/lib/parser.js @@ -13,6 +13,7 @@ // ... var EventEmitter = require('events').EventEmitter; +const debug = require('debug')('Parser'); var Parser = module.exports = function Parser() { // Make sure we call our parents constructor @@ -40,7 +41,7 @@ Parser.prototype.receive = function receive(buffer) { json = this.buffer.slice(0, index); this.buffer = this.buffer.slice(index + Parser.END_LENGTH); if (json.length > 0) { - console.log("[TWITTER STREAM OUTPUT]: ", json); + debug("[TWITTER STREAM OUTPUT]: ", json); try { json = JSON.parse(json); // Event message @@ -72,11 +73,11 @@ Parser.prototype.receive = function receive(buffer) { if (start !== -1) { var end = process.hrtime(start); var secondsElapsed = (end[0] * NS_PER_SEC + end[1]) / NS_PER_SEC; - console.log('PING took ' + secondsElapsed + ' seconds'); + debug('PING took ' + secondsElapsed + ' seconds'); if (secondsElapsed > PING_PERIOD_THRESHOLD) this.emit('disconnect') } - console.log("[SENDING PING?] ", (new Date()).toISOString()); + debug("[SENDING PING?] ", (new Date()).toISOString()); start = process.hrtime(); // Keep Alive diff --git a/lib/twitter.js b/lib/twitter.js index eb2d88ca..71b3cf58 100644 --- a/lib/twitter.js +++ b/lib/twitter.js @@ -8,6 +8,7 @@ var url = require('url'); var Streamparser = require('./parser'); var request = require('request'); var extend = require('deep-extend'); +const debug = require('debug')('Twitter'); // Package version var VERSION = require('../package.json').version; @@ -274,22 +275,22 @@ Twitter.prototype.stream = function(method, params, callback) { request.on('response', function(response) { if(response.statusCode !== 200) { - console.log("Status Code: " + response.statusCode); + debug("Status Code: " + response.statusCode); stream.emit('error', new Error('Status Code: ' + response.statusCode)); } else { - console.log("[RESPONSE]: " + response.statusCode); + debug("[RESPONSE]: " + response.statusCode); stream.emit('response', response); } response.on('data', function(chunk) { if (typeof chunk === "string" || chunk.length > 2) - console.log("[DATA "+ chunk.length +"]: " + chunk); + debug("[DATA "+ chunk.length +"]: " + chunk); stream.receive(chunk); }); response.on('error', function(error) { - console.log("[ERROR]: " + error); + debug("[ERROR]: " + error); stream.emit('error', error); }); @@ -298,13 +299,13 @@ Twitter.prototype.stream = function(method, params, callback) { }) response.on('end', function() { - console.log("[END]: " + response.statusCode); + debug("[END]: " + response.statusCode); stream.emit('end', response); }); }); request.on('error', function(error) { - console.log("[ERROR]: " + error); + debug("[ERROR]: " + error); stream.emit('error', error); }); request.end(); diff --git a/package.json b/package.json index f790385c..112dfa05 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,8 @@ }, "dependencies": { "deep-extend": "^0.5.0", - "request": "^2.72.0" + "request": "^2.72.0", + "debug": "3.1.0" }, "devDependencies": { "eslint": "^3.12.0", From faf8603a894b2f185b65726585a95753641ec7a5 Mon Sep 17 00:00:00 2001 From: Burton Wevers Date: Fri, 15 Jun 2018 21:03:36 +0200 Subject: [PATCH 3/3] Added a timeout function to execute a disconnected based on the threshold supplied --- lib/parser.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/parser.js b/lib/parser.js index 56fc34e6..1543a173 100644 --- a/lib/parser.js +++ b/lib/parser.js @@ -30,11 +30,17 @@ Parser.END_LENGTH = 2; var start = -1; const NS_PER_SEC = 1e9; -const PING_PERIOD_THRESHOLD = 60; +const PING_PERIOD_THRESHOLD = 60; // in seconds +var timeoutFunction, self; +var timeout = function () { + debug('Timeout Reached, sending disconnect packet'); + self.emit('disconnect'); +} Parser.prototype.receive = function receive(buffer) { this.buffer += buffer.toString('utf8'); var index, json; + self = this; // We have END? while ((index = this.buffer.indexOf(Parser.END)) > -1) { @@ -74,11 +80,11 @@ Parser.prototype.receive = function receive(buffer) { var end = process.hrtime(start); var secondsElapsed = (end[0] * NS_PER_SEC + end[1]) / NS_PER_SEC; debug('PING took ' + secondsElapsed + ' seconds'); - if (secondsElapsed > PING_PERIOD_THRESHOLD) - this.emit('disconnect') + clearTimeout(timeoutFunction); } - debug("[SENDING PING?] ", (new Date()).toISOString()); + debug("[RECEIVED PING?] ", (new Date()).toISOString()); start = process.hrtime(); + timeoutFunction = setTimeout(timeout, (PING_PERIOD_THRESHOLD * 1000)); // Keep Alive this.emit('ping');