From 32c51f10d33c686f2607559ff24a97d6c4a6f109 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Thu, 31 May 2018 12:11:22 +0200 Subject: [PATCH] stream: make the pipeline callback mandatory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Right now when not adding a callback to the pipeline it could cause an uncaught exception if there is an error. Instead, just make the callback mandatory as mostly done in all other Node.js callback APIs so users explicitly have to decide what to do in such situations. PR-URL: /~https://github.com/nodejs/node/pull/21054 Reviewed-By: Matteo Collina Reviewed-By: Anna Henningsen Reviewed-By: Michaƫl Zasso Reviewed-By: Trivikram Kamat Reviewed-By: James M Snell --- doc/api/stream.md | 6 +++--- lib/internal/streams/pipeline.js | 13 ++++++------- test/parallel/test-stream-pipeline.js | 19 +++++-------------- 3 files changed, 14 insertions(+), 24 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 014b2f68b3359b..09ff2c02b1885b 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1340,14 +1340,14 @@ run().catch(console.error); rs.resume(); // drain the stream ``` -### stream.pipeline(...streams[, callback]) +### stream.pipeline(...streams, callback) * `...streams` {Stream} Two or more streams to pipe between. -* `callback` {Function} A callback function that takes an optional error - argument. +* `callback` {Function} Called when the pipeline is fully done. + * `err` {Error} A module method to pipe between streams forwarding errors and properly cleaning up and provide a callback when the pipeline is complete. diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 849b3d39dbe25b..caa4042339bd37 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -6,6 +6,7 @@ let eos; const { + ERR_INVALID_CALLBACK, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED } = require('internal/errors').codes; @@ -19,11 +20,6 @@ function once(callback) { }; } -function noop(err) { - // Rethrow the error if it exists to avoid swallowing it - if (err) throw err; -} - function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -66,8 +62,11 @@ function pipe(from, to) { } function popCallback(streams) { - if (!streams.length) return noop; - if (typeof streams[streams.length - 1] !== 'function') return noop; + // Streams should never be an empty array. It should always contain at least + // a single stream. Therefore optimize for the average case instead of + // checking for length === 0 as well. + if (typeof streams[streams.length - 1] !== 'function') + throw new ERR_INVALID_CALLBACK(); return streams.pop(); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 12733d88a7ac85..f735054e88e9b8 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -60,7 +60,7 @@ common.crashOnUnhandledRejection(); }, /ERR_MISSING_ARGS/); assert.throws(() => { pipeline(); - }, /ERR_MISSING_ARGS/); + }, /ERR_INVALID_CALLBACK/); } { @@ -493,17 +493,8 @@ common.crashOnUnhandledRejection(); } }); - read.on('close', common.mustCall()); - transform.on('close', common.mustCall()); - write.on('close', common.mustCall()); - - process.on('uncaughtException', common.mustCall((err) => { - assert.deepStrictEqual(err, new Error('kaboom')); - })); - - const dst = pipeline(read, transform, write); - - assert.strictEqual(dst, write); - - read.push('hello'); + assert.throws( + () => pipeline(read, transform, write), + { code: 'ERR_INVALID_CALLBACK' } + ); }