From c5639771896196c70b3fc4ea4f205d430697cd1a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 28 Jun 2020 17:44:07 +0200 Subject: [PATCH 1/2] stream: fix writable.end callback behavior Changes so that the end() callback behaves the same way in relation to _final as write() does to _write/_writev. --- doc/api/stream.md | 8 ++- lib/_stream_writable.js | 53 +++++++++---------- .../test-stream-transform-final-sync.js | 4 +- test/parallel/test-stream-transform-final.js | 4 +- test/parallel/test-stream-writable-destroy.js | 2 +- .../test-stream-writable-end-cb-error.js | 4 +- .../test-stream-writable-end-cb-uncaught.js | 2 +- test/parallel/test-stream-write-destroy.js | 2 +- 8 files changed, 37 insertions(+), 42 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 588e7bdfd3a7a2..0827143065ff44 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -428,15 +428,13 @@ changes: `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value other than `null`. * `encoding` {string} The encoding if `chunk` is a string -* `callback` {Function} Optional callback for when the stream finishes - or errors +* `callback` {Function} Callback for when the stream is finished. * Returns: {this} Calling the `writable.end()` method signals that no more data will be written to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one final additional chunk of data to be written immediately before closing the -stream. If provided, the optional `callback` function is attached as a listener -for the [`'finish'`][] and the `'error'` event. +stream. Calling the [`stream.write()`][stream-write] method after calling [`stream.end()`][stream-end] will raise an error. @@ -592,7 +590,7 @@ changes: `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value other than `null`. * `encoding` {string} The encoding, if `chunk` is a string. **Default:** `'utf8'` -* `callback` {Function} Callback for when this chunk of data is flushed +* `callback` {Function} Callback for when this chunk of data is flushed. * Returns: {boolean} `false` if the stream wishes for the calling code to wait for the `'drain'` event to be emitted before continuing to write additional data; otherwise `true`. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 916e9b87d9c17a..65a7fe9f9f0984 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -64,6 +64,8 @@ ObjectSetPrototypeOf(Writable, Stream); function nop() {} +const kOnFinished = Symbol('kOnFinished'); + function WritableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. @@ -185,6 +187,8 @@ function WritableState(options, stream, isDuplex) { // True if close has been emitted or would have been emitted // depending on emitClose. this.closeEmitted = false; + + this[kOnFinished] = []; } function resetBuffer(state) { @@ -411,7 +415,7 @@ function onwriteError(stream, state, er, cb) { // not enabled. Passing `er` here doesn't make sense since // it's related to one specific write, not to the buffered // writes. - errorBuffer(state, new ERR_STREAM_DESTROYED('write')); + errorBuffer(state); // This can emit error, but error must always follow cb. errorOrDestroy(stream, er); } @@ -487,14 +491,14 @@ function afterWrite(stream, state, count, cb) { } if (state.destroyed) { - errorBuffer(state, new ERR_STREAM_DESTROYED('write')); + errorBuffer(state); } finishMaybe(stream, state); } // If there's something in the buffer waiting, then invoke callbacks. -function errorBuffer(state, err) { +function errorBuffer(state) { if (state.writing) { return; } @@ -503,7 +507,11 @@ function errorBuffer(state, err) { const { chunk, callback } = state.buffered[n]; const len = state.objectMode ? 1 : chunk.length; state.length -= len; - callback(err); + callback(new ERR_STREAM_DESTROYED('write')); + } + + for (const callback of state[kOnFinished].splice(0)) { + callback(new ERR_STREAM_DESTROYED('end')); } resetBuffer(state); @@ -611,10 +619,11 @@ Writable.prototype.end = function(chunk, encoding, cb) { } if (typeof cb === 'function') { - if (err || state.finished) + if (err || state.finished) { process.nextTick(cb, err); - else - onFinished(this, cb); + } else { + state[kOnFinished].push(cb); + } } return this; @@ -636,6 +645,9 @@ function callFinal(stream, state) { stream._final((err) => { state.pendingcb--; if (err) { + for (const callback of state[kOnFinished].splice(0)) { + callback(err); + } errorOrDestroy(stream, err, state.sync); } else if (needFinish(state)) { state.prefinished = true; @@ -683,6 +695,11 @@ function finish(stream, state) { return; state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + stream.emit('finish'); if (state.autoDestroy) { @@ -701,26 +718,6 @@ function finish(stream, state) { } } -// TODO(ronag): Avoid using events to implement internal logic. -function onFinished(stream, cb) { - function onerror(err) { - stream.removeListener('finish', onfinish); - stream.removeListener('error', onerror); - cb(err); - if (stream.listenerCount('error') === 0) { - stream.emit('error', err); - } - } - - function onfinish() { - stream.removeListener('finish', onfinish); - stream.removeListener('error', onerror); - cb(); - } - stream.on('finish', onfinish); - stream.prependListener('error', onerror); -} - ObjectDefineProperties(Writable.prototype, { destroyed: { @@ -800,7 +797,7 @@ const destroy = destroyImpl.destroy; Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { - process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write')); + process.nextTick(errorBuffer, state); } destroy.call(this, err, cb); return this; diff --git a/test/parallel/test-stream-transform-final-sync.js b/test/parallel/test-stream-transform-final-sync.js index 4faf1b067627ad..f8465c8929631c 100644 --- a/test/parallel/test-stream-transform-final-sync.js +++ b/test/parallel/test-stream-transform-final-sync.js @@ -90,7 +90,7 @@ const t = new stream.Transform({ t.on('finish', common.mustCall(function() { state++; // finishListener - assert.strictEqual(state, 14); + assert.strictEqual(state, 15); }, 1)); t.on('end', common.mustCall(function() { state++; @@ -106,5 +106,5 @@ t.write(4); t.end(7, common.mustCall(function() { state++; // endMethodCallback - assert.strictEqual(state, 15); + assert.strictEqual(state, 14); }, 1)); diff --git a/test/parallel/test-stream-transform-final.js b/test/parallel/test-stream-transform-final.js index 19af744a6bb33e..dd6cc3b427d6b7 100644 --- a/test/parallel/test-stream-transform-final.js +++ b/test/parallel/test-stream-transform-final.js @@ -92,7 +92,7 @@ const t = new stream.Transform({ t.on('finish', common.mustCall(function() { state++; // finishListener - assert.strictEqual(state, 14); + assert.strictEqual(state, 15); }, 1)); t.on('end', common.mustCall(function() { state++; @@ -108,5 +108,5 @@ t.write(4); t.end(7, common.mustCall(function() { state++; // endMethodCallback - assert.strictEqual(state, 15); + assert.strictEqual(state, 14); }, 1)); diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 706847a8582d0c..80b51a50c52555 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -354,7 +354,7 @@ const assert = require('assert'); assert.strictEqual(err.message, 'asd'); })); write.end('asd', common.mustCall((err) => { - assert.strictEqual(err.message, 'asd'); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); })); write.destroy(new Error('asd')); } diff --git a/test/parallel/test-stream-writable-end-cb-error.js b/test/parallel/test-stream-writable-end-cb-error.js index 24989a6d06a111..8f6d209954436f 100644 --- a/test/parallel/test-stream-writable-end-cb-error.js +++ b/test/parallel/test-stream-writable-end-cb-error.js @@ -17,10 +17,10 @@ const stream = require('stream'); })); writable.write('asd'); writable.end(common.mustCall((err) => { - assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); })); writable.end(common.mustCall((err) => { - assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); })); } diff --git a/test/parallel/test-stream-writable-end-cb-uncaught.js b/test/parallel/test-stream-writable-end-cb-uncaught.js index ab25cac81b0bee..5c1753aa067342 100644 --- a/test/parallel/test-stream-writable-end-cb-uncaught.js +++ b/test/parallel/test-stream-writable-end-cb-uncaught.js @@ -19,5 +19,5 @@ writable._final = (cb) => { writable.write('asd'); writable.end(common.mustCall((err) => { - assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); })); diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js index 297217eb4accc6..1acf45a9ab2781 100644 --- a/test/parallel/test-stream-write-destroy.js +++ b/test/parallel/test-stream-write-destroy.js @@ -57,7 +57,7 @@ for (const withPendingData of [ false, true ]) { w.destroy(); assert.strictEqual(chunksWritten, 1); callbacks.shift()(); - assert.strictEqual(chunksWritten, 2); + assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2); assert.strictEqual(callbacks.length, 0); assert.strictEqual(drains, 1); From 5d901cba6268a83ed41915dca6f45e127891ded6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 29 Jun 2020 23:34:07 +0200 Subject: [PATCH 2/2] fixup: add changes --- doc/api/stream.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 0827143065ff44..003389502aeac1 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -412,6 +412,9 @@ Is `true` after [`writable.destroy()`][writable-destroy] has been called.