Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 76 additions & 37 deletions lib/twitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,54 +194,93 @@ Twitter.prototype.post = function(url, params, callback) {
/**
* STREAM
*/
Twitter.prototype.stream = function (method, params, callback) {
if (typeof params === 'function') {
callback = params;
params = {};
}
Twitter.prototype.stream = (function () {
var savedParams = {};

var base = 'stream';
return function(method, params, callback) {

if (method === 'user' || method === 'site') {
base = method + '_' + base;
}
// Use parameters from most recent call when we need to retry request
savedParams.method = method;
savedParams.params = params;
savedParams.callback = callback;

var url = this.__buildEndpoint(method, base);
if (typeof params === 'function') {
savedParams.callback = callback = params;
savedParams.params = params = {};
}

var request = this.request({ url: url, qs: params});
// Don't start new request if we're waiting for a retry
if (typeof savedParams.pendingTimeout !== 'undefined' && savedParams.pendingTimeout) {
return;
}

var stream = new streamparser();
stream.destroy = function() {
// FIXME: should we emit end/close on explicit destroy?
if ( typeof request.abort === 'function' )
request.abort(); // node v0.4.0
else
request.socket.destroy();
};
var base = 'stream';

request.on('response', function(response) {
response.on('data', function(chunk) {
stream.receive(chunk);
});
if (method === 'user' || method === 'site') {
base = method + '_' + base;
}

response.on('error', function(error) {
stream.emit('error', error);
});
var url = this.__buildEndpoint(method, base);

var request = this.request({ url: url, qs: params});

var stream = new streamparser();
stream.destroy = function() {
// FIXME: should we emit end/close on explicit destroy?
if ( typeof request.abort === 'function' )
request.abort(); // node v0.4.0
else
request.socket.destroy();
};

(function(state, savedParams) {
request.on('response', function(response) {

response.on('end', function() {
stream.emit('end', response);
// We've sent too many requests - cancel current request and retry in 10s
if (response.statusCode === 420) {
savedParams.pendingTimeout = true;
stream.destroy();

var timeout = function() {
savedParams.pendingTimeout = false;

// Call stream again with latest parameters
state.stream(
savedParams.method,
savedParams.params,
savedParams.callback
);
};

setTimeout(timeout, 10000);
return;
}

response.on('data', function(chunk) {
stream.receive(chunk);
});

response.on('error', function(error) {
stream.emit('error', error);
});

response.on('end', function() {
stream.emit('end', response);
});
});
})(this, savedParams);

request.on('error', function(error) {
stream.emit('error', error);
});
});

request.on('error', function(error) {
stream.emit('error', error);
});
request.end();
request.end();

if ( typeof callback === 'function' ) {
callback(stream);
}
};
if ( typeof callback === 'function' ) {
callback(stream);
}
};
})();


module.exports = Twitter;