Skip to content

Commit

Permalink
stream: add pool option to the map operator
Browse files Browse the repository at this point in the history
Fixes: #46132
  • Loading branch information
rluvaton committed Aug 20, 2023
1 parent ecde9d9 commit 942dc4b
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 3 deletions.
86 changes: 83 additions & 3 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');
const { isWritable, isNodeStream } = require('internal/streams/utils');
const { createDeferredPromise } = require('internal/util');

const {
ArrayPrototypePush,
Expand All @@ -33,6 +34,7 @@ const {
Promise,
PromiseReject,
PromisePrototypeThen,
PromiseResolve,
Symbol,
} = primordials;

Expand Down Expand Up @@ -64,6 +66,32 @@ function compose(stream, options) {
return composedStream;
}

function dynamicPromiseRace(promises) {
const { promise, resolve, reject } = createDeferredPromise();

for (let i = 0; i < promises.length; i++) {
PromisePrototypeThen(PromiseResolve(promises[i]), (val) => resolve({
__proto__: null,
value: val,
i,
}), reject);
}

return {
__proto__: null,
promise,
addToRace: (newValueInQueue) => {
const i = promises.length;
PromisePrototypeThen(newValueInQueue, (value) => resolve({
__proto__: null,
value,
i,
}), reject);
},
};

}

function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand All @@ -76,12 +104,12 @@ function map(fn, options) {
validateAbortSignal(options.signal, 'options.signal');
}

let concurrency = 1;
let concurrency = options?.pool ? 2 : 1;
if (options?.concurrency != null) {
concurrency = MathFloor(options.concurrency);
}

validateInteger(concurrency, 'concurrency', 1);
validateInteger(concurrency, 'concurrency', options?.pool ? 2 : 1);

return async function* map() {
const signal = AbortSignal.any([options?.signal].filter(Boolean));
Expand All @@ -92,6 +120,7 @@ function map(fn, options) {
let next;
let resume;
let done = false;
let addToRace;

function onDone() {
done = true;
Expand Down Expand Up @@ -122,6 +151,9 @@ function map(fn, options) {
val.catch(onDone);
}

// This is needed for the pool strategy
addToRace?.(val);

queue.push(val);
if (next) {
next();
Expand Down Expand Up @@ -150,7 +182,47 @@ function map(fn, options) {

pump();

try {
async function* poolStrategy() {
while (true) {
while (queue.length > 0) {
let promise;

addToRace = null;
({ promise, addToRace } = dynamicPromiseRace(queue));

const { i, value } = await promise;

queue.splice(i, 1);

if (value === kEof) {
continue;
}

if (signal.aborted) {
throw new AbortError();
}

if (value !== kEmpty) {
yield value;
}

if (resume) {
resume();
resume = null;
}
}

if (done) {
return;
}

await new Promise((resolve) => {
next = resolve;
});
}
}

async function* queueStrategy() {
while (true) {
while (queue.length > 0) {
const val = await queue[0];
Expand Down Expand Up @@ -178,6 +250,14 @@ function map(fn, options) {
next = resolve;
});
}
}

try {
if (options?.pool) {
yield* await poolStrategy();
} else {
yield* await queueStrategy();
}
} finally {
done = true;
if (resume) {
Expand Down
140 changes: 140 additions & 0 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ const assert = require('assert');
const { once } = require('events');
const { setTimeout } = require('timers/promises');

function createDependentPromises(n) {
const promiseAndResolveArray = [];

for (let i = 0; i < n; i++) {
let res;
const promise = new Promise((resolve) => {
if (i === 0) {
res = resolve;
return;
}
res = () => promiseAndResolveArray[i - 1][0].then(resolve);
});

promiseAndResolveArray.push([promise, res]);
}

return promiseAndResolveArray;
}

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
Expand Down Expand Up @@ -173,15 +192,136 @@ const { setTimeout } = require('timers/promises');
})().then(common.mustCall());
}


{
// Simple pool-based concurrency
const finishOrder = [];

const promises = createDependentPromises(4);

const raw = Readable.from([2, 0, 1, 3]);
const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 2, pool: true });

(async () => {
await stream.toArray();

assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// Pool-based concurrency with a lot of items and large concurrency
const finishOrder = [];

const promises = createDependentPromises(20);

const raw = Readable.from([11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19]);
// Should be
// 11, 1, 0, 3, 4 | next: 0
// 11, 1, 3, 4, 2 | next: 1
// 11, 3, 4, 2, 5 | next: 2
// 11, 3, 4, 5, 7 | next: 3
// 11, 4, 5, 7, 8 | next: 4
// 11, 5, 7, 8, 9 | next: 5
// 11, 7, 8, 9, 6 | next: 6
// 11, 7, 8, 9, 10 | next: 7
// 11, 8, 9, 10, 12 | next: 8
// 11, 9, 10, 12, 13 | next: 9
// 11, 10, 12, 13, 18 | next: 10
// 11, 12, 13, 18, 15 | next: 11
// 12, 13, 18, 15, 16 | next: 12
// 13, 18, 15, 16, 17 | next: 13
// 18, 15, 16, 17, 14 | next: 14
// 18, 15, 16, 17, 19 | next: 15
// 18, 16, 17, 19 | next: 16
// 18, 17, 19 | next: 17
// 18, 19 | next: 18
// 19 | next: 19
//

const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 5, pool: true });

(async () => {
await stream.toArray();

assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// Pool-based concurrency where there is a delay between the first and the next item in the pool
const finishOrder = [];

const raw = Readable.from((async function *() {
yield 200;

// Making sure the first item (200) finish before the next item (0) starts
// this is to make sure we don't wait for the pool to be filled before starting
await setTimeout(500);

yield 0;
yield 1;
})());

let start;
let delaysBetweenFirstAndNextPoolItem;

const stream = raw
.map(async (item) => {
await setTimeout(item);
finishOrder.push(item);

return item;
}, { concurrency: 2, pool: true })
.map((item) => {
if (item === 200) {
start = Date.now();
}

if (item === 0) {
delaysBetweenFirstAndNextPoolItem = Date.now() - start;
}

return item;
});

(async () => {
await stream.toArray();

assert.deepStrictEqual(finishOrder, [200, 0, 1]);
// More than 250ms because the first item waits for 200ms and the next item have delay of 500ms
assert.ok(delaysBetweenFirstAndNextPoolItem > 250, new Error(`delay between first and next item in the pool should be more than 250ms but instead got ${delaysBetweenFirstAndNextPoolItem}`));
})().then(common.mustCall(), common.mustNotCall());
}

{
// Error cases
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, {
concurrency: 1,
pool: true
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}

{
// Test result is a Readable
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
Expand Down

0 comments on commit 942dc4b

Please sign in to comment.