diff --git a/lib/parser.js b/lib/parser.js index 7a4c9184..1543a173 100644 --- a/lib/parser.js +++ b/lib/parser.js @@ -13,6 +13,7 @@ // ... var EventEmitter = require('events').EventEmitter; +const debug = require('debug')('Parser'); var Parser = module.exports = function Parser() { // Make sure we call our parents constructor @@ -27,15 +28,26 @@ Parser.prototype = Object.create(EventEmitter.prototype); Parser.END = '\r\n'; Parser.END_LENGTH = 2; +var start = -1; +const NS_PER_SEC = 1e9; +const PING_PERIOD_THRESHOLD = 60; // in seconds +var timeoutFunction, self; +var timeout = function () { + debug('Timeout Reached, sending disconnect packet'); + self.emit('disconnect'); +} + Parser.prototype.receive = function receive(buffer) { this.buffer += buffer.toString('utf8'); var index, json; + self = this; // We have END? while ((index = this.buffer.indexOf(Parser.END)) > -1) { json = this.buffer.slice(0, index); this.buffer = this.buffer.slice(index + Parser.END_LENGTH); if (json.length > 0) { + debug("[TWITTER STREAM OUTPUT]: ", json); try { json = JSON.parse(json); // Event message @@ -64,8 +76,18 @@ Parser.prototype.receive = function receive(buffer) { } } else { + if (start !== -1) { + var end = process.hrtime(start); + var secondsElapsed = (end[0] * NS_PER_SEC + end[1]) / NS_PER_SEC; + debug('PING took ' + secondsElapsed + ' seconds'); + clearTimeout(timeoutFunction); + } + debug("[RECEIVED PING?] ", (new Date()).toISOString()); + start = process.hrtime(); + timeoutFunction = setTimeout(timeout, (PING_PERIOD_THRESHOLD * 1000)); + // Keep Alive this.emit('ping'); } } -}; +}; \ No newline at end of file diff --git a/lib/twitter.js b/lib/twitter.js index 1dbf3e27..71b3cf58 100644 --- a/lib/twitter.js +++ b/lib/twitter.js @@ -8,6 +8,7 @@ var url = require('url'); var Streamparser = require('./parser'); var request = require('request'); var extend = require('deep-extend'); +const debug = require('debug')('Twitter'); // Package version var VERSION = require('../package.json').version; @@ -274,27 +275,38 @@ Twitter.prototype.stream = function(method, params, callback) { request.on('response', function(response) { if(response.statusCode !== 200) { + debug("Status Code: " + response.statusCode); stream.emit('error', new Error('Status Code: ' + response.statusCode)); } else { + debug("[RESPONSE]: " + response.statusCode); stream.emit('response', response); } response.on('data', function(chunk) { + if (typeof chunk === "string" || chunk.length > 2) + debug("[DATA "+ chunk.length +"]: " + chunk); stream.receive(chunk); }); response.on('error', function(error) { - stream.emit('error', error); + debug("[ERROR]: " + error); + stream.emit('error', error); }); + response.on('disconnect', function (){ + stream.emit('disconnect', response); + }) + response.on('end', function() { - stream.emit('end', response); + debug("[END]: " + response.statusCode); + stream.emit('end', response); }); }); request.on('error', function(error) { - stream.emit('error', error); + debug("[ERROR]: " + error); + stream.emit('error', error); }); request.end(); diff --git a/package.json b/package.json index f790385c..112dfa05 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,8 @@ }, "dependencies": { "deep-extend": "^0.5.0", - "request": "^2.72.0" + "request": "^2.72.0", + "debug": "3.1.0" }, "devDependencies": { "eslint": "^3.12.0",