Skip to content

Commit

Permalink
test: tests for _readableStream.awaitDrain
Browse files Browse the repository at this point in the history
Fixes: nodejs#8684
PR-URL: nodejs#8914
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
shmuga authored and italoacasas committed Jan 25, 2017
1 parent 4b1bd4e commit c55d83e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
16 changes: 16 additions & 0 deletions test/parallel/test-stream-pipe-await-drain-manual-resume.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

// A consumer stream with a very low highWaterMark, which starts in a state
// where it buffers the chunk it receives rather than indicating that they
Expand All @@ -26,6 +27,11 @@ const readable = new stream.Readable({
readable.pipe(writable);

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'awaitDrain doesn\'t increase'
);
// First pause, resume manually. The next write() to writable will still
// return false, because chunks are still being buffered, so it will increase
// the awaitDrain counter again.
Expand All @@ -34,6 +40,11 @@ readable.once('pause', common.mustCall(() => {
}));

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'.resume() does not reset counter'
);
// Second pause, handle all chunks from now on. Once all callbacks that
// are currently queued up are handled, the awaitDrain drain counter should
// fall back to 0 and all chunks that are pending on the readable side
Expand All @@ -50,5 +61,10 @@ readable.push(Buffer.alloc(100)); // Should get through to the writable.
readable.push(null);

writable.on('finish', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
0,
'awaitDrain not 0 after all chunks are written'
);
// Everything okay, all chunks were written.
}));
22 changes: 20 additions & 2 deletions test/parallel/test-stream-pipe-await-drain-push-while-write.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

const awaitDrainStates = [
1, // after first chunk before callback
1, // after second chunk before callback
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
];

// A writable stream which pushes data onto the stream which pipes into it,
// but only the first time it's written to. Since it's not paused at this time,
// a second write will occur. If the pipe increases awaitDrain twice, we'll
// never get subsequent chunks because 'drain' is only emitted once.
const writable = new stream.Writable({
write: common.mustCall((chunk, encoding, cb) => {
write: common.mustCall(function(chunk, encoding, cb) {
if (chunk.length === 32 * 1024) { // first chunk
readable.push(new Buffer(33 * 1024)); // above hwm
const beforePush = readable._readableState.awaitDrain;
readable.push(new Buffer(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased.
const afterPush = readable._readableState.awaitDrain;
assert.strictEqual(afterPush - beforePush, 1,
'Counter is not increased for awaitDrain');
}

assert.strictEqual(
awaitDrainStates.shift(),
readable._readableState.awaitDrain,
'State variable awaitDrain is not correct.'
);
cb();
}, 3)
});
Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-pipe-await-drain.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

// This is very similar to test-stream-pipe-cleanup-pause.js.

const reader = new stream.Readable();
const writer1 = new stream.Writable();
const writer2 = new stream.Writable();
const writer3 = new stream.Writable();

// 560000 is chosen here because it is larger than the (default) highWaterMark
// and will cause `.write()` to return false
Expand All @@ -19,7 +21,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {
this.emit('chunk-received');
cb();
}, 1);

writer1.once('chunk-received', function() {
assert.strictEqual(reader._readableState.awaitDrain, 0,
'initial value is not 0');
setImmediate(function() {
// This one should *not* get through to writer1 because writer2 is not
// "done" processing.
Expand All @@ -29,12 +34,26 @@ writer1.once('chunk-received', function() {

// A "slow" consumer:
writer2._write = common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual(
reader._readableState.awaitDrain, 1,
'awaitDrain isn\'t 1 after first push'
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
// will return false.
}, 1);

writer3._write = common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual(
reader._readableState.awaitDrain, 2,
'awaitDrain isn\'t 2 after second push'
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
// will return false.
}, 1);

reader.pipe(writer1);
reader.pipe(writer2);
reader.pipe(writer3);
reader.push(buffer);

0 comments on commit c55d83e

Please sign in to comment.