diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 09cc5590fad4fc..0a82d50c12cb9d 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -267,7 +267,7 @@ function constructNT(stream) { } else if (err) { errorOrDestroy(stream, err, true); } else { - process.nextTick(emitConstructNT, stream); + emitConstructNT(stream); } } diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 849cfb3538b306..3259446590e3b1 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -23,6 +23,7 @@ const common = require('../common'); const assert = require('assert'); const { PassThrough, Transform } = require('stream'); +const { Readable } = require('node:stream'); { // Verify writable side consumption @@ -468,3 +469,57 @@ const { PassThrough, Transform } = require('stream'); assert.strictEqual(ended, true); })); } + +{ + const createInnerTransform = () => new Transform({ + objectMode: true, + + construct(callback) { + this.push('header from constructor'); + callback(); + }, + + transform: (row, encoding, callback) => { + callback(null, 'transform | ' + row); + }, + }); + + const createOuterTransform = () => { + let innerTransform; + + return new Transform({ + objectMode: true, + + transform(row, encoding, callback) { + if (!innerTransform) { + innerTransform = createInnerTransform(); + innerTransform.on('data', (data) => { + this.push(data); + }); + + callback(); + } else if (innerTransform.write('outer | ' + row)) { + process.nextTick(callback); + } else { + innerTransform.once('drain', callback); + } + }, + }); + }; + + Readable.from([ + 'create InnerTransform', + 'firstLine', + 'secondLine', + ]) + .compose(createOuterTransform()) + .toArray().then( + common.mustCall((value) => { + assert.deepStrictEqual(value, [ + 'header from constructor', + 'transform | outer | firstLine', + 'transform | outer | secondLine', + ]); + }), + ); +}