From 5427ee5ad06c466140822709ce81f3a66159f4e6 Mon Sep 17 00:00:00 2001 From: danypype Date: Thu, 8 Mar 2018 17:12:03 -0500 Subject: [PATCH 1/4] Stop pushing data when push() returns false --- index.js | 20 +++++++++++++++++--- test/pauses.js | 26 ++++++++++++++++++++++++++ test/pushes.js | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 test/pushes.js diff --git a/index.js b/index.js index 9b0d161..05edea5 100644 --- a/index.js +++ b/index.js @@ -8,6 +8,7 @@ class PgQueryStream extends Readable { this.cursor = new Cursor(text, values) this._reading = false this._closed = false + this._buffer = [] this.batchSize = (options || { }).batchSize || 100 // delegate Submittable callbacks to cursor @@ -35,7 +36,17 @@ class PgQueryStream extends Readable { return false } this._reading = true - const readAmount = Math.max(size, this.batchSize) + var readAmount = Math.max(size, this.batchSize) + var object; + + while (object = this._buffer.shift()) { + readAmount--; + if (!this.push(object)) { + this._reading = false + return + } + } + this.cursor.read(readAmount, (err, rows) => { if (this._closed) { return @@ -52,8 +63,11 @@ class PgQueryStream extends Readable { // push each row into the stream this._reading = false - for (var i = 0; i < rows.length; i++) { - this.push(rows[i]) + while (object = rows.shift()) { + if (!this.push(object)) { + this._buffer = this._buffer.concat(rows) + return + } } }) } diff --git a/test/pauses.js b/test/pauses.js index 8d9beb0..7d6f1b7 100644 --- a/test/pauses.js +++ b/test/pauses.js @@ -1,6 +1,7 @@ var concat = require('concat-stream') var tester = require('stream-tester') var JSONStream = require('JSONStream') +var assert = require('assert'); var QueryStream = require('../') @@ -15,4 +16,29 @@ require('./helper')('pauses', function (client) { done() })) }) + + it('keeps a stable internal buffer size when paused/resumed', function (done) { + this.timeout(5000) + + var stream = client.query(new QueryStream('SELECT * FROM generate_series(0, $1)', [10000], {batchSize: 100})) + var results = [] + var concurrency = 50 + + stream.on('data', function (result) { + results.push(result) + + if (results.length == concurrency) { + stream.pause() + + setTimeout(function () { + results = [] + stream.resume() + }, 10) + } + + assert(stream._readableState.buffer.length <= stream.batchSize) + }) + + stream.on('end', done); + }) }) diff --git a/test/pushes.js b/test/pushes.js new file mode 100644 index 0000000..f45853f --- /dev/null +++ b/test/pushes.js @@ -0,0 +1,33 @@ +var helper = require('./helper') +var QueryStream = require('../') +var Writable = require('stream').Writable +var assert = require('assert') + +helper('pushes', function (client) { + it('stops pushing data when push() returns false and resumes on _read()', function (done) { + var readable = client.query(new QueryStream('SELECT * FROM generate_series(0, $1)', [500])) + var writable = new Writable({highWaterMark: 100, objectMode: true}) + var shouldPushAgain = true + + readable.original_read = readable._read + readable._read = function (size) { + shouldPushAgain = true + return this.original_read(size) + } + + readable.originalPush = readable.push + readable.push = function (data) { + if (!shouldPushAgain && !(shouldPushAgain = this.originalPush(data))) { + assert(false) + } else { + return (shouldPushAgain = this.originalPush(data)) + } + } + + writable._write = function (chunk, encoding, callback) { + setImmediate(callback) + } + + readable.pipe(writable).on('finish', done) + }) +}) From 6c5538a8cec1f9c05052b35012a3899768aaa24e Mon Sep 17 00:00:00 2001 From: danypype Date: Thu, 8 Mar 2018 17:48:10 -0500 Subject: [PATCH 2/4] Simplify test test/pushes.js --- test/pushes.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/pushes.js b/test/pushes.js index f45853f..6633c7e 100644 --- a/test/pushes.js +++ b/test/pushes.js @@ -17,11 +17,9 @@ helper('pushes', function (client) { readable.originalPush = readable.push readable.push = function (data) { - if (!shouldPushAgain && !(shouldPushAgain = this.originalPush(data))) { - assert(false) - } else { - return (shouldPushAgain = this.originalPush(data)) - } + assert(shouldPushAgain) + shouldPushAgain = this.originalPush(data) + return shouldPushAgain } writable._write = function (chunk, encoding, callback) { From 6af3d7b94c6428d7b8105ad820fb68db9112e430 Mon Sep 17 00:00:00 2001 From: danypype Date: Thu, 8 Mar 2018 18:12:46 -0500 Subject: [PATCH 3/4] Only read from cursor if readAmount > 0 --- index.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 05edea5..91a5276 100644 --- a/index.js +++ b/index.js @@ -39,7 +39,7 @@ class PgQueryStream extends Readable { var readAmount = Math.max(size, this.batchSize) var object; - while (object = this._buffer.shift()) { + while (object = this._buffer.shift() && readAmount) { readAmount--; if (!this.push(object)) { this._reading = false @@ -47,6 +47,11 @@ class PgQueryStream extends Readable { } } + if (!readAmount) { + this._reading = false + return + } + this.cursor.read(readAmount, (err, rows) => { if (this._closed) { return From 193f3d4162ea936449f818863640e481e69f1603 Mon Sep 17 00:00:00 2001 From: danypype Date: Thu, 8 Mar 2018 18:15:08 -0500 Subject: [PATCH 4/4] Properly shift objects from _buffer in while loop --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 91a5276..f3ee72c 100644 --- a/index.js +++ b/index.js @@ -39,7 +39,7 @@ class PgQueryStream extends Readable { var readAmount = Math.max(size, this.batchSize) var object; - while (object = this._buffer.shift() && readAmount) { + while ((object = this._buffer.shift()) && readAmount) { readAmount--; if (!this.push(object)) { this._reading = false