diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index c2e863b27df179..ff7d95554acd70 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -180,7 +180,7 @@ function Readable(options) { this._destroy = options.destroy; } - Stream.call(this); + Stream.call(this, options); } ObjectDefineProperty(Readable.prototype, 'destroyed', { @@ -223,6 +223,14 @@ Readable.prototype._destroy = function(err, cb) { cb(err); }; +Readable.prototype[EE.captureRejectionSymbol] = function(err) { + // TODO(mcollina): remove the destroyed if once errorEmitted lands in + // Readable. + if (!this.destroyed) { + this.destroy(err); + } +}; + // Manually shove something into the read() buffer. // This returns true if the highWaterMark has not been hit yet, // similar to how Writable.write() returns true if you should diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e6195c60516354..34189ce91d6cb0 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -37,6 +37,7 @@ module.exports = Writable; Writable.WritableState = WritableState; const internalUtil = require('internal/util'); +const EE = require('events'); const Stream = require('stream'); const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); @@ -250,7 +251,7 @@ function Writable(options) { this._final = options.final; } - Stream.call(this); + Stream.call(this, options); } // Otherwise people can pipe Writable streams, which is just wrong. @@ -782,3 +783,7 @@ Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { cb(err); }; + +Writable.prototype[EE.captureRejectionSymbol] = function(err) { + this.destroy(err); +}; diff --git a/lib/internal/streams/legacy.js b/lib/internal/streams/legacy.js index 702e3c56ba6376..2bc7a86aa050b6 100644 --- a/lib/internal/streams/legacy.js +++ b/lib/internal/streams/legacy.js @@ -6,8 +6,8 @@ const { const EE = require('events'); -function Stream() { - EE.call(this); +function Stream(opts) { + EE.call(this, opts); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE); diff --git a/test/parallel/test-stream-catch-rejections.js b/test/parallel/test-stream-catch-rejections.js new file mode 100644 index 00000000000000..fb5f1fccc18bd2 --- /dev/null +++ b/test/parallel/test-stream-catch-rejections.js @@ -0,0 +1,52 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const assert = require('assert'); + +{ + const r = new stream.Readable({ + captureRejections: true, + read() { + this.push('hello'); + this.push('world'); + this.push(null); + } + }); + + const err = new Error('kaboom'); + + r.on('error', common.mustCall((_err) => { + assert.strictEqual(err, _err); + assert.strictEqual(r.destroyed, true); + })); + + r.on('data', async () => { + throw err; + }); +} + +{ + const w = new stream.Writable({ + captureRejections: true, + highWaterMark: 1, + write(chunk, enc, cb) { + cb(); + } + }); + + const err = new Error('kaboom'); + + w.write('hello', () => { + w.write('world'); + }); + + w.on('error', common.mustCall((_err) => { + assert.strictEqual(err, _err); + assert.strictEqual(w.destroyed, true); + })); + + w.on('drain', common.mustCall(async () => { + throw err; + }, 2)); +}