diff --git a/lib/twitter.js b/lib/twitter.js index 1e226acf..c139c4f1 100644 --- a/lib/twitter.js +++ b/lib/twitter.js @@ -194,54 +194,93 @@ Twitter.prototype.post = function(url, params, callback) { /** * STREAM */ -Twitter.prototype.stream = function (method, params, callback) { - if (typeof params === 'function') { - callback = params; - params = {}; - } +Twitter.prototype.stream = (function () { + var savedParams = {}; - var base = 'stream'; + return function(method, params, callback) { - if (method === 'user' || method === 'site') { - base = method + '_' + base; - } + // Use parameters from most recent call when we need to retry request + savedParams.method = method; + savedParams.params = params; + savedParams.callback = callback; - var url = this.__buildEndpoint(method, base); + if (typeof params === 'function') { + savedParams.callback = callback = params; + savedParams.params = params = {}; + } - var request = this.request({ url: url, qs: params}); + // Don't start new request if we're waiting for a retry + if (typeof savedParams.pendingTimeout !== 'undefined' && savedParams.pendingTimeout) { + return; + } - var stream = new streamparser(); - stream.destroy = function() { - // FIXME: should we emit end/close on explicit destroy? - if ( typeof request.abort === 'function' ) - request.abort(); // node v0.4.0 - else - request.socket.destroy(); - }; + var base = 'stream'; - request.on('response', function(response) { - response.on('data', function(chunk) { - stream.receive(chunk); - }); + if (method === 'user' || method === 'site') { + base = method + '_' + base; + } - response.on('error', function(error) { - stream.emit('error', error); - }); + var url = this.__buildEndpoint(method, base); + + var request = this.request({ url: url, qs: params}); + + var stream = new streamparser(); + stream.destroy = function() { + // FIXME: should we emit end/close on explicit destroy? + if ( typeof request.abort === 'function' ) + request.abort(); // node v0.4.0 + else + request.socket.destroy(); + }; + + (function(state, savedParams) { + request.on('response', function(response) { - response.on('end', function() { - stream.emit('end', response); + // We've sent too many requests - cancel current request and retry in 10s + if (response.statusCode === 420) { + savedParams.pendingTimeout = true; + stream.destroy(); + + var timeout = function() { + savedParams.pendingTimeout = false; + + // Call stream again with latest parameters + state.stream( + savedParams.method, + savedParams.params, + savedParams.callback + ); + }; + + setTimeout(timeout, 10000); + return; + } + + response.on('data', function(chunk) { + stream.receive(chunk); + }); + + response.on('error', function(error) { + stream.emit('error', error); + }); + + response.on('end', function() { + stream.emit('end', response); + }); + }); + })(this, savedParams); + + request.on('error', function(error) { + stream.emit('error', error); }); - }); - request.on('error', function(error) { - stream.emit('error', error); - }); - request.end(); + request.end(); - if ( typeof callback === 'function' ) { - callback(stream); - } -}; + if ( typeof callback === 'function' ) { + callback(stream); + } + }; +})(); module.exports = Twitter;