Skip to content

Commit

Permalink
stream: use finished for pump
Browse files Browse the repository at this point in the history
Re-use existing compat logic for pump by using
finished.

PR-URL: #39203
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag authored and targos committed Jul 11, 2021
1 parent dfe5d11 commit 35b6669
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 73 deletions.
72 changes: 32 additions & 40 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,16 @@ const {
ERR_INVALID_RETURN_VALUE,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE,
},
} = require('internal/errors');

const { validateCallback } = require('internal/validators');

function noop() {}

const {
isIterable,
isReadable,
isStream,
} = require('internal/streams/utils');
const assert = require('internal/assert');

let PassThrough;
let Readable;
Expand Down Expand Up @@ -109,62 +105,58 @@ async function* fromReadable(val) {

async function pump(iterable, writable, finish) {
let error;
let callback = noop;
let onresolve = null;

const resume = (err) => {
error = aggregateTwoErrors(error, err);
const _callback = callback;
callback = noop;
_callback();
};
const onClose = () => {
resume(new ERR_STREAM_PREMATURE_CLOSE());
if (err) {
error = err;
}

if (onresolve) {
const callback = onresolve;
onresolve = null;
callback();
}
};

const waitForDrain = () => new Promise((resolve) => {
assert(callback === noop);
if (error || writable.destroyed) {
resolve();
const wait = () => new Promise((resolve, reject) => {
if (error) {
reject(error);
} else {
callback = resolve;
onresolve = () => {
if (error) {
reject(error);
} else {
resolve();
}
};
}
});

writable
.on('drain', resume)
.on('error', resume)
.on('close', onClose);
writable.on('drain', resume);
const cleanup = eos(writable, { readable: false }, resume);

try {
if (writable.writableNeedDrain) {
await waitForDrain();
}

if (error) {
return;
await wait();
}

for await (const chunk of iterable) {
if (!writable.write(chunk)) {
await waitForDrain();
await wait();
}
if (error) {
return;
}
}

if (error) {
return;
}

writable.end();

await wait();

finish();
} catch (err) {
error = aggregateTwoErrors(error, err);
finish(error !== err ? aggregateTwoErrors(error, err) : err);
} finally {
writable
.off('drain', resume)
.off('error', resume)
.off('close', onClose);
finish(error);
cleanup();
writable.off('drain', resume);
}
}

Expand Down
33 changes: 0 additions & 33 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1387,36 +1387,3 @@ const net = require('net');
assert.strictEqual(res, content);
}));
}

{
const writableLike = new Stream();
writableLike.writableNeedDrain = true;

pipeline(
async function *() {},
writableLike,
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
})
);

writableLike.emit('close');
}

{
const writableLike = new Stream();
writableLike.write = () => false;

pipeline(
async function *() {
yield null;
yield null;
},
writableLike,
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
})
);

writableLike.emit('close');
}

0 comments on commit 35b6669

Please sign in to comment.