diff --git a/.gitignore b/.gitignore index b04ea25d91b8..60ba0f22d861 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.idea .DS_STORE .eslintcache *.swp diff --git a/CHANGELOG.md b/CHANGELOG.md index 925911127054..37cf215683c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ - `[jest-haste-map]` Accept a `getCacheKey` method in `hasteImplModulePath` modules to reset the cache when the logic changes ([#7350](/~https://github.com/facebook/jest/pull/7350)) - `[jest-config]` Add `haste.computeSha1` option to compute the sha-1 of the files in the haste map ([#7345](/~https://github.com/facebook/jest/pull/7345)) - `[expect]` `expect(Infinity).toBeCloseTo(Infinity)` Treats `Infinity` as equal in toBeCloseTo matcher ([#7405](/~https://github.com/facebook/jest/pull/7405)) +- `[jest-worker]` Add node worker-thread support to jest-worker ([#7408](/~https://github.com/facebook/jest/pull/7408)) ### Fixes diff --git a/jest-worker b/jest-worker new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/packages/jest-resolve/src/isBuiltinModule.js b/packages/jest-resolve/src/isBuiltinModule.js index 67762edc1ee8..48378729f34b 100644 --- a/packages/jest-resolve/src/isBuiltinModule.js +++ b/packages/jest-resolve/src/isBuiltinModule.js @@ -15,11 +15,13 @@ declare var process: { binding(type: string): {}, }; +const EXPERIMENTAL_MODULES = ['worker_threads']; + const BUILTIN_MODULES = - builtinModules || - Object.keys(process.binding('natives')).filter( - (module: string) => !/^internal\//.test(module), - ); + builtinModules.concat(EXPERIMENTAL_MODULES) || + Object.keys(process.binding('natives')) + .filter((module: string) => !/^internal\//.test(module)) + .concat(EXPERIMENTAL_MODULES); export default function isBuiltinModule(module: string): boolean { return BUILTIN_MODULES.indexOf(module) !== -1; diff --git a/packages/jest-worker/README.md b/packages/jest-worker/README.md index 3c98dfdb2b5e..c5de73619559 100644 --- a/packages/jest-worker/README.md +++ b/packages/jest-worker/README.md @@ -39,6 +39,12 @@ export function hello(param) { } ``` +## Experimental worker + +Node 10 shipped with [worker-threads](https://nodejs.org/api/worker_threads.html), a "threading API" that uses SharedArrayBuffers to communicate between the main process and its child threads. This experimental Node feature can significantly improve the communication time between parent and child processes in `jest-worker`. + +We will use worker threads where available. To enable in Node 10+, run the Node process with the `--experimental-worker` flag. + ## API The only exposed method is a constructor (`Worker`) that is initialized by passing the worker path, plus an options object. @@ -77,6 +83,12 @@ By default, no process is bound to any worker. The arguments that will be passed to the `setup` method during initialization. +#### `workerPool: (workerPath: string, options?: WorkerPoolOptions) => WorkerPoolInterface` (optional) + +Provide a custom worker pool to be used for spawning child processes. By default, Jest will use a node thread pool if available and fall back to child process threads. + +The arguments that will be passed to the `setup` method during initialization. + ## Worker The returned `Worker` instance has all the exposed methods, plus some additional ones to interact with the workers itself: diff --git a/packages/jest-worker/src/Farm.js b/packages/jest-worker/src/Farm.js new file mode 100644 index 000000000000..b69846ceafa0 --- /dev/null +++ b/packages/jest-worker/src/Farm.js @@ -0,0 +1,160 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import type { + ChildMessage, + QueueChildMessage, + WorkerInterface, + OnStart, + OnEnd, +} from './types'; +import {CHILD_MESSAGE_CALL} from './types'; + +export default class Farm { + _computeWorkerKey: (string, ...Array) => ?string; + _cacheKeys: {[string]: WorkerInterface, __proto__: null}; + _callback: Function; + _last: Array; + _locks: Array; + _numOfWorkers: number; + _offset: number; + _queue: Array; + + constructor( + numOfWorkers: number, + callback: Function, + computeWorkerKey?: (string, ...Array) => ?string, + ) { + this._callback = callback; + this._numOfWorkers = numOfWorkers; + this._cacheKeys = Object.create(null); + this._queue = []; + this._last = []; + this._locks = []; + this._offset = 0; + if (computeWorkerKey) { + this._computeWorkerKey = computeWorkerKey; + } + } + + doWork(method: string, ...args: Array): Promise { + return new Promise((resolve, reject) => { + const computeWorkerKey = this._computeWorkerKey; + const request: ChildMessage = [CHILD_MESSAGE_CALL, false, method, args]; + + let worker: ?WorkerInterface = null; + let hash: ?string = null; + + if (computeWorkerKey) { + hash = computeWorkerKey.apply(this, [method].concat(args)); + worker = hash == null ? null : this._cacheKeys[hash]; + } + + const onStart: OnStart = (worker: WorkerInterface) => { + if (hash != null) { + this._cacheKeys[hash] = worker; + } + }; + + const onEnd: OnEnd = (error: ?Error, result: ?mixed) => { + if (error) { + reject(error); + } else { + resolve(result); + } + }; + + const task = {onEnd, onStart, request}; + if (worker) { + this._enqueue(task, worker.getWorkerId()); + } else { + this._push(task); + } + }); + } + + _getNextJob(workerId: number): ?QueueChildMessage { + let queueHead = this._queue[workerId]; + + while (queueHead && queueHead.request[1]) { + queueHead = queueHead.next; + } + + this._queue[workerId] = queueHead; + + return queueHead; + } + + _process(workerId: number): Farm { + if (this.isLocked(workerId)) { + return this; + } + + const job = this._getNextJob(workerId); + + if (!job) { + return this; + } + + const onEnd = (error: ?Error, result: mixed) => { + job.onEnd(error, result); + this.unlock(workerId); + this._process(workerId); + }; + + this.lock(workerId); + + this._callback(workerId, job.request, job.onStart, onEnd); + + job.request[1] = true; + + return this; + } + + _enqueue(task: QueueChildMessage, workerId: number): Farm { + if (task.request[1]) { + return this; + } + + if (this._queue[workerId]) { + this._last[workerId].next = task; + } else { + this._queue[workerId] = task; + } + + this._last[workerId] = task; + this._process(workerId); + + return this; + } + + _push(task: QueueChildMessage): Farm { + for (let i = 0; i < this._numOfWorkers; i++) { + const workerIdx = (this._offset + i) % this._numOfWorkers; + this._enqueue(task, workerIdx); + } + this._offset++; + + return this; + } + + lock(workerId: number): void { + this._locks[workerId] = true; + } + + unlock(workerId: number): void { + this._locks[workerId] = false; + } + + isLocked(workerId: number): boolean { + return this._locks[workerId]; + } +} diff --git a/packages/jest-worker/src/WorkerPool.js b/packages/jest-worker/src/WorkerPool.js new file mode 100644 index 000000000000..adb93a62e7c6 --- /dev/null +++ b/packages/jest-worker/src/WorkerPool.js @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import BaseWorkerPool from './base/BaseWorkerPool'; + +import type { + ChildMessage, + WorkerOptions, + OnStart, + OnEnd, + WorkerPoolInterface, + WorkerInterface, +} from './types'; + +const canUseWorkerThreads = () => { + try { + // $FlowFixMe: Flow doesn't know about experimental APIs + require('worker_threads'); + return true; + } catch (_) { + return false; + } +}; + +class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface { + send( + workerId: number, + request: ChildMessage, + onStart: OnStart, + onEnd: OnEnd, + ): void { + this.getWorkerById(workerId).send(request, onStart, onEnd); + } + + createWorker(workerOptions: WorkerOptions): WorkerInterface { + let Worker; + if (canUseWorkerThreads()) { + Worker = require('./workers/NodeThreadsWorker').default; + } else { + Worker = require('./workers/ChildProcessWorker').default; + } + + return new Worker(workerOptions); + } +} + +export default WorkerPool; diff --git a/packages/jest-worker/src/__performance_tests__/test.js b/packages/jest-worker/src/__performance_tests__/test.js index 6bccfe6ade86..b8733fb46380 100644 --- a/packages/jest-worker/src/__performance_tests__/test.js +++ b/packages/jest-worker/src/__performance_tests__/test.js @@ -1,4 +1,9 @@ -// Copyright (c) 2014-present, Facebook, Inc. All rights reserved. +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ 'use strict'; diff --git a/packages/jest-worker/src/__performance_tests__/workers/jest_worker.js b/packages/jest-worker/src/__performance_tests__/workers/jest_worker.js index 532c3ce59ac3..5dc97a78d789 100644 --- a/packages/jest-worker/src/__performance_tests__/workers/jest_worker.js +++ b/packages/jest-worker/src/__performance_tests__/workers/jest_worker.js @@ -1,4 +1,9 @@ -// Copyright (c) 2014-present, Facebook, Inc. All rights reserved. +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ 'use strict'; diff --git a/packages/jest-worker/src/__performance_tests__/workers/pi.js b/packages/jest-worker/src/__performance_tests__/workers/pi.js index 740e1da31633..5d43441c111e 100644 --- a/packages/jest-worker/src/__performance_tests__/workers/pi.js +++ b/packages/jest-worker/src/__performance_tests__/workers/pi.js @@ -1,4 +1,9 @@ -// Copyright (c) 2014-present, Facebook, Inc. All rights reserved. +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ 'use strict'; diff --git a/packages/jest-worker/src/__performance_tests__/workers/worker_farm.js b/packages/jest-worker/src/__performance_tests__/workers/worker_farm.js index cfbd346b55ea..ae93de3317f3 100644 --- a/packages/jest-worker/src/__performance_tests__/workers/worker_farm.js +++ b/packages/jest-worker/src/__performance_tests__/workers/worker_farm.js @@ -1,4 +1,9 @@ -// Copyright (c) 2014-present, Facebook, Inc. All rights reserved. +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ 'use strict'; diff --git a/packages/jest-worker/src/__tests__/Farm.test.js b/packages/jest-worker/src/__tests__/Farm.test.js new file mode 100644 index 000000000000..43a2b25c47c0 --- /dev/null +++ b/packages/jest-worker/src/__tests__/Farm.test.js @@ -0,0 +1,272 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import Farm from '../Farm'; + +let mockWorkerCalls; +let callback; + +function workerReplyStart(i) { + mockWorkerCalls[i].onStart({getWorkerId: () => mockWorkerCalls[i].workerId}); +} + +function workerReplyEnd(i, error, result) { + mockWorkerCalls[i].onEnd(error, result); +} + +function workerReply(i, error, result) { + workerReplyStart(i); + workerReplyEnd(i, error, result); +} + +describe('Farm', () => { + beforeEach(() => { + mockWorkerCalls = []; + callback = jest.fn((...args) => { + mockWorkerCalls.push({ + onEnd: args[3], + onStart: args[2], + passed: args[1], + workerId: args[0], + }); + }); + }); + + it('sends a request to one worker', () => { + const farm = new Farm(4, callback); + + farm.doWork('foo', 42); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith( + 0, + [1, true, 'foo', [42]], + expect.any(Function), + expect.any(Function), + ); + }); + + it('sends four requests to four unique workers', () => { + const farm = new Farm(4, callback); + + farm.doWork('foo', 42); + farm.doWork('foo1', 43); + farm.doWork('foo2', 44); + farm.doWork('foo3', 45); + + expect(callback).toHaveBeenCalledTimes(4); + expect(callback).toHaveBeenNthCalledWith( + 1, + 0, // first worker + [1, true, 'foo', [42]], + expect.any(Function), + expect.any(Function), + ); + expect(callback).toHaveBeenNthCalledWith( + 2, + 1, // second worker + [1, true, 'foo1', [43]], + expect.any(Function), + expect.any(Function), + ); + expect(callback).toHaveBeenNthCalledWith( + 3, + 2, // third worker + [1, true, 'foo2', [44]], + expect.any(Function), + expect.any(Function), + ); + expect(callback).toHaveBeenNthCalledWith( + 4, + 3, // fourth worker + [1, true, 'foo3', [45]], + expect.any(Function), + expect.any(Function), + ); + }); + + it('handles null computeWorkerKey, sending to first worker', async () => { + const computeWorkerKey = jest.fn(() => null); + + const farm = new Farm(4, callback, computeWorkerKey); + + const p0 = farm.doWork('foo', 42); + workerReply(0); + await p0; + + expect(computeWorkerKey).toBeCalledTimes(1); + expect(computeWorkerKey).toHaveBeenNthCalledWith(1, 'foo', 42); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenNthCalledWith( + 1, + 0, // first worker + [1, true, 'foo', [42]], + expect.any(Function), + expect.any(Function), + ); + }); + + it('sends the same worker key to the same worker', async () => { + const computeWorkerKey = jest + .fn(() => {}) + .mockReturnValueOnce('one') + .mockReturnValueOnce('two') + .mockReturnValueOnce('one'); + + const farm = new Farm(4, callback, computeWorkerKey); + + const p0 = farm.doWork('foo', 42); + workerReply(0); + await p0; + + const p1 = farm.doWork('foo1', 43); + workerReply(1); + await p1; + + const p2 = farm.doWork('foo2', 44); + workerReply(2); + await p2; + + expect(computeWorkerKey).toBeCalledTimes(3); + expect(computeWorkerKey).toHaveBeenNthCalledWith(1, 'foo', 42); + expect(computeWorkerKey).toHaveBeenNthCalledWith(2, 'foo1', 43); + expect(computeWorkerKey).toHaveBeenNthCalledWith(3, 'foo2', 44); + + expect(callback).toHaveBeenCalledTimes(3); + expect(callback).toHaveBeenNthCalledWith( + 1, + 0, // first worker + [1, true, 'foo', [42]], + expect.any(Function), + expect.any(Function), + ); + expect(callback).toHaveBeenNthCalledWith( + 2, + 1, // second worker + [1, true, 'foo1', [43]], + expect.any(Function), + expect.any(Function), + ); + expect(callback).toHaveBeenNthCalledWith( + 3, + 0, // first worker again + [1, true, 'foo2', [44]], + expect.any(Function), + expect.any(Function), + ); + }); + + it('returns the result if the call worked', async () => { + const farm = new Farm(4, callback); + + const promise = farm.doWork('car', 'plane'); + + workerReply(0, null, 34); + const result = await promise; + + expect(result).toEqual(34); + }); + + it('throws if the call failed', async () => { + const farm = new Farm(4, callback); + + const promise = farm.doWork('car', 'plane'); + let error = null; + + workerReply(0, new TypeError('Massively broken')); + + try { + await promise; + } catch (err) { + error = err; + } + + expect(error).not.toBe(null); + expect(error).toBeInstanceOf(TypeError); + }); + + it('checks that once a sticked task finishes, next time is sent to that worker', async () => { + const farm = new Farm(4, callback, () => '1234567890abcdef'); + + // Worker 1 successfully replies with "17" as a result. + const p0 = farm.doWork('car', 'plane'); + workerReply(0, null, 17); + await p0; + + // Note that the stickiness is not created by the method name or the arguments + // it is solely controlled by the provided "computeWorkerKey" method, which in + // the test example always returns the same key, so all calls should be + // redirected to worker 1 (which is the one that resolved the first call). + const p1 = farm.doWork('foo', 'bar'); + workerReply(1, null, 17); + await p1; + + // The first time, a call with a "1234567890abcdef" hash had never been done + // earlier ("foo" call), so it got queued to all workers. Later, since the one + // that resolved the call was the one in position 1, all subsequent calls are + // only redirected to that worker. + expect(callback).toHaveBeenCalledTimes(2); // Only "foo". + expect(callback).toHaveBeenNthCalledWith( + 1, + 0, // first worker + [1, true, 'car', ['plane']], + expect.any(Function), + expect.any(Function), + ); + expect(callback).toHaveBeenNthCalledWith( + 2, + 0, // first worker + [1, true, 'foo', ['bar']], + expect.any(Function), + expect.any(Function), + ); + }); + + it('checks that even before a sticked task finishes, next time is sent to that worker', async () => { + const farm = new Farm(4, callback, () => '1234567890abcdef'); + + // Note that the worker is sending a start response synchronously. + const p0 = farm.doWork('car', 'plane'); + workerReplyStart(0); + + // Note that the worker is sending a start response synchronously. + const p1 = farm.doWork('foo', 'bar'); + + // The first call is sent the the worker, the second is queued + expect(callback).toHaveBeenCalledTimes(1); + + // Flush the queue + workerReplyEnd(0, null, 17); + await p0; + workerReply(1, null, 17); + await p1; + + // Both requests are send to the same worker + // The first time, a call with a "1234567890abcdef" hash had never been done + // earlier ("foo" call), so it got queued to all workers. Later, since the one + // that resolved the call was the one in position 1, all subsequent calls are + // only redirected to that worker. + expect(callback).toHaveBeenCalledTimes(2); + expect(callback).toHaveBeenNthCalledWith( + 1, + 0, // first worker + [1, true, 'car', ['plane']], + expect.any(Function), + expect.any(Function), + ); + expect(callback).toHaveBeenNthCalledWith( + 2, + 0, // first worker + [1, true, 'foo', ['bar']], + expect.any(Function), + expect.any(Function), + ); + }); +}); diff --git a/packages/jest-worker/src/__tests__/WorkerPool.test.js b/packages/jest-worker/src/__tests__/WorkerPool.test.js new file mode 100644 index 000000000000..51898348c2bd --- /dev/null +++ b/packages/jest-worker/src/__tests__/WorkerPool.test.js @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import WorkerPool from '../WorkerPool'; + +jest.mock('../workers/ChildProcessWorker', () => { + const fakeClass = jest.fn(() => ({ + getStderr: jest.fn(), + getStdout: jest.fn(), + send: jest.fn(), + })); + + return { + __esModule: true, + default: fakeClass, + }; +}); + +jest.mock('../workers/NodeThreadsWorker', () => { + const fakeClass = jest.fn(() => ({ + getStderr: jest.fn(), + getStdout: jest.fn(), + send: jest.fn(), + })); + + return { + __esModule: true, + default: fakeClass, + }; +}); + +const ChildProcessWorker = require('../workers/ChildProcessWorker').default; +const NodeThreadWorker = require('../workers/NodeThreadsWorker').default; + +describe('WorkerPool', () => { + beforeEach(() => { + ChildProcessWorker.mockClear(); + NodeThreadWorker.mockClear(); + }); + + it('should create a ChildProcessWorker and send to it', () => { + jest.mock('worker_threads', () => { + throw Error('Undefined'); + }); + const workerPool = new WorkerPool('/path', { + forkOptions: {}, + maxRetries: 1, + numWorkers: 1, + workerId: 0, + workerPath: '/path', + }); + + const onStart = () => {}; + const onEnd = () => {}; + workerPool.send(0, {foo: 'bar'}, onStart, onEnd); + + expect(ChildProcessWorker).toBeCalledWith({ + forkOptions: {}, + maxRetries: 1, + workerId: 0, + workerPath: '/path', + }); + expect(NodeThreadWorker).not.toBeCalled(); + expect(workerPool._workers[0].send).toBeCalledWith( + {foo: 'bar'}, + onStart, + onEnd, + ); + }); + + it('should create a NodeThreadWorker and send to it', () => { + jest.mock('worker_threads', () => 'Defined'); + const workerPool = new WorkerPool('/path', { + forkOptions: {}, + maxRetries: 1, + numWorkers: 1, + workerId: 0, + workerPath: '/path', + }); + + const onStart = () => {}; + const onEnd = () => {}; + workerPool.send(0, {foo: 'bar'}, onStart, onEnd); + + expect(NodeThreadWorker).toBeCalledWith({ + forkOptions: {}, + maxRetries: 1, + workerId: 0, + workerPath: '/path', + }); + expect(ChildProcessWorker).not.toBeCalled(); + expect(workerPool._workers[0].send).toBeCalledWith( + {foo: 'bar'}, + onStart, + onEnd, + ); + }); +}); diff --git a/packages/jest-worker/src/__tests__/index-integration.test.js b/packages/jest-worker/src/__tests__/index-integration.test.js deleted file mode 100644 index 6d3b122c1908..000000000000 --- a/packages/jest-worker/src/__tests__/index-integration.test.js +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -'use strict'; - -import EventEmitter from 'events'; - -import {CHILD_MESSAGE_CALL, PARENT_MESSAGE_OK} from '../types'; - -let Farm; -let mockForkedProcesses; - -function mockBuildForkedProcess() { - const mockChild = new EventEmitter(); - - mockChild.send = jest.fn(); - - return mockChild; -} - -function replySuccess(i, result) { - mockForkedProcesses[i].emit('message', [PARENT_MESSAGE_OK, result]); -} - -function assertCallsToChild(childNum, ...calls) { - expect(mockForkedProcesses[childNum].send).toHaveBeenCalledTimes( - calls.length + 1, - ); - - calls.forEach(([methodName, ...args], numCall) => { - expect( - mockForkedProcesses[childNum].send.mock.calls[numCall + 1][0], - ).toEqual([CHILD_MESSAGE_CALL, true, methodName, args]); - }); -} - -beforeEach(() => { - mockForkedProcesses = []; - - jest.mock('child_process', () => ({ - fork() { - const forkedProcess = mockBuildForkedProcess(); - - mockForkedProcesses.push(forkedProcess); - - return forkedProcess; - }, - })); - - Farm = require('../index').default; -}); - -afterEach(() => { - jest.resetModules(); -}); - -it('calls a single method from the worker', async () => { - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - numWorkers: 4, - }); - - const promise = farm.foo(); - - replySuccess(0, 42); - - expect(await promise).toBe(42); -}); - -it('distributes sequential calls across child processes', async () => { - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - numWorkers: 4, - }); - - // The first call will go to the first child process. - const promise0 = farm.foo('param-0'); - - assertCallsToChild(0, ['foo', 'param-0']); - replySuccess(0, 'worker-0'); - expect(await promise0).toBe('worker-0'); - - // The second call will go to the second child process. - const promise1 = farm.foo(1); - - assertCallsToChild(1, ['foo', 1]); - replySuccess(1, 'worker-1'); - expect(await promise1).toBe('worker-1'); -}); - -it('distributes concurrent calls across child processes', async () => { - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - numWorkers: 4, - }); - - // Do 3 calls to the farm in parallel. - const promise0 = farm.foo('param-0'); - const promise1 = farm.foo('param-1'); - const promise2 = farm.foo('param-2'); - - // Check that the method calls are sent to each separate child process. - assertCallsToChild(0, ['foo', 'param-0']); - assertCallsToChild(1, ['foo', 'param-1']); - assertCallsToChild(2, ['foo', 'param-2']); - - // Send different responses from each child. - replySuccess(0, 'worker-0'); - replySuccess(1, 'worker-1'); - replySuccess(2, 'worker-2'); - - // Check - expect(await promise0).toBe('worker-0'); - expect(await promise1).toBe('worker-1'); - expect(await promise2).toBe('worker-2'); -}); - -it('sticks parallel calls to children', async () => { - const farm = new Farm('/tmp/baz.js', { - computeWorkerKey: () => '1234567890abcdef', - exposedMethods: ['foo', 'bar'], - numWorkers: 4, - }); - - // Do 3 calls to the farm in parallel. - const promise0 = farm.foo('param-0'); - const promise1 = farm.foo('param-1'); - const promise2 = farm.foo('param-2'); - - // Send different responses for each call (from the same child). - replySuccess(0, 'worker-0'); - replySuccess(0, 'worker-1'); - replySuccess(0, 'worker-2'); - - // Check that all the calls have been received by the same child). - assertCallsToChild( - 0, - ['foo', 'param-0'], - ['foo', 'param-1'], - ['foo', 'param-2'], - ); - - // Check that responses are correct. - expect(await promise0).toBe('worker-0'); - expect(await promise1).toBe('worker-1'); - expect(await promise2).toBe('worker-2'); -}); diff --git a/packages/jest-worker/src/__tests__/index.test.js b/packages/jest-worker/src/__tests__/index.test.js index ba49151b6ec2..af00ddfd6c9d 100644 --- a/packages/jest-worker/src/__tests__/index.test.js +++ b/packages/jest-worker/src/__tests__/index.test.js @@ -8,44 +8,33 @@ 'use strict'; let Farm; -let Worker; -let mockWorkers; - -function workerReplyStart(i) { - mockWorkers[i].send.mock.calls[0][1](mockWorkers[i]); -} - -function workerReplyEnd(i, error, result) { - mockWorkers[i].send.mock.calls[0][2](error, result); -} - -function workerReply(i, error, result) { - workerReplyStart(i); - workerReplyEnd(i, error, result); -} +let WorkerPool; +let Queue; beforeEach(() => { - mockWorkers = []; - - // The worker mock returns a worker with custom methods, plus it stores them - // in a global list, so that they can be accessed later. This list is reset in - // every test. - jest.mock('../Worker', () => { - const fakeClass = jest.fn(() => { - const fakeWorker = { - getStderr: () => ({once() {}, pipe() {}}), - getStdout: () => ({once() {}, pipe() {}}), - send: jest.fn(), - }; + jest.mock('../Farm', () => { + const fakeClass = jest.fn(() => ({ + doWork: jest.fn().mockResolvedValue(42), + })); - mockWorkers.push(fakeWorker); + return { + __esModule: true, + default: fakeClass, + }; + }); - return fakeWorker; - }); + jest.mock('../WorkerPool', () => { + const fakeWorker = jest.fn(() => ({ + createWorker: jest.fn(), + end: jest.fn(), + getStderr: () => jest.fn(a => a), + getStdout: () => jest.fn(a => a), + send: jest.fn(), + })); return { __esModule: true, - default: fakeClass, + default: fakeWorker, }; }); @@ -63,16 +52,36 @@ beforeEach(() => { virtual: true, }); - Worker = require('../Worker').default; - Farm = require('../index').default; + Farm = require('..').default; + Queue = require('../Farm').default; + WorkerPool = require('../WorkerPool').default; }); afterEach(() => { jest.resetModules(); }); -it('exposes the right API', () => { +it('exposes the right API using default working', () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + expect(typeof farm.foo).toBe('function'); + expect(typeof farm.bar).toBe('function'); +}); + +it('exposes the right API using passed worker', () => { + const WorkerPool = jest.fn(() => ({ + createWorker: jest.fn(), + end: jest.fn(), + getStderr: () => jest.fn(a => a), + getStdout: () => jest.fn(a => a), + send: jest.fn(), + })); + const farm = new Farm('/tmp/baz.js', { + WorkerPool, exposedMethods: ['foo', 'bar'], numWorkers: 4, }); @@ -94,128 +103,19 @@ it('breaks if any of the forbidden methods is tried to be exposed', () => { }); it('works with minimal options', () => { - // eslint-disable-next-line no-new const farm1 = new Farm('/fake-worker.js'); - expect(Worker).toHaveBeenCalledTimes(require('os').cpus().length - 1); + expect(Queue).toHaveBeenCalledTimes(1); + expect(WorkerPool).toHaveBeenCalledTimes(1); expect(typeof farm1.methodA).toBe('function'); expect(typeof farm1.methodB).toBe('function'); expect(typeof farm1._shouldNotExist).not.toBe('function'); - // eslint-disable-next-line no-new const farm2 = new Farm('/fake-worker-with-default-method.js'); expect(typeof farm2.default).toBe('function'); }); -it('tries instantiating workers with the right options', () => { - // eslint-disable-next-line no-new - new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - forkOptions: {execArgv: []}, - maxRetries: 6, - numWorkers: 4, - }); - - expect(Worker).toHaveBeenCalledTimes(4); - expect(Worker.mock.calls[0][0]).toEqual({ - forkOptions: {execArgv: []}, - maxRetries: 6, - setupArgs: [], - workerId: 1, - workerPath: '/tmp/baz.js', - }); -}); - -it('create multiple workers with unique worker ids', () => { - // eslint-disable-next-line no-new - new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - forkOptions: {execArgv: []}, - maxRetries: 6, - numWorkers: 3, - }); - - expect(Worker).toHaveBeenCalledTimes(3); - expect(Worker.mock.calls[0][0].workerId).toEqual(1); - expect(Worker.mock.calls[1][0].workerId).toEqual(2); - expect(Worker.mock.calls[2][0].workerId).toEqual(3); -}); - -it('makes a non-existing relative worker throw', () => { - expect( - () => - new Farm('./baz.js', { - exposedMethods: [], - numWorkers: 1, - }), - ).toThrow(); -}); - -it('aggregates all stdouts and stderrs from all workers', () => { - const out = []; - const err = []; - - Worker.mockImplementation(() => ({ - getStderr: () => ({ - once() {}, - pipe(errStream) { - err.push(errStream); - }, - }), - getStdout: () => ({ - once() {}, - pipe(outStream) { - out.push(outStream); - }, - }), - })); - - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - numWorkers: 2, - }); - - expect(out.length).toBe(2); - expect(err.length).toBe(2); - - const stdout = jest.fn(); - const stderr = jest.fn(); - - farm.getStdout().on('data', stdout); - farm.getStderr().on('data', stderr); - - out[0].write(Buffer.from('hello')); - out[1].write(Buffer.from('bye')); - err[1].write(Buffer.from('house')); - err[0].write(Buffer.from('tree')); - - expect(stdout.mock.calls[0][0].toString()).toBe('hello'); - expect(stdout.mock.calls[1][0].toString()).toBe('bye'); - expect(stderr.mock.calls[0][0].toString()).toBe('house'); - expect(stderr.mock.calls[1][0].toString()).toBe('tree'); -}); - -it('works when stdout and stderr are not piped to the parent', () => { - Worker.mockImplementation(() => ({ - getStderr: () => null, - getStdout: () => null, - send: () => null, - })); - - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - forkOptions: { - silent: false, - stdio: 'inherit', - }, - numWorkers: 2, - }); - - expect(() => farm.foo()).not.toThrow(); - expect(() => farm.bar()).not.toThrow(); -}); - it('does not let make calls after the farm is ended', () => { const farm = new Farm('/tmp/baz.js', { exposedMethods: ['foo', 'bar'], @@ -224,8 +124,13 @@ it('does not let make calls after the farm is ended', () => { farm.end(); - expect(() => farm.foo()).toThrow(); - expect(() => farm.bar()).toThrow(); + expect(farm._workerPool.end).toHaveBeenCalledTimes(1); + expect(() => farm.foo()).toThrow( + 'Farm is ended, no more calls can be done to it', + ); + expect(() => farm.bar()).toThrow( + 'Farm is ended, no more calls can be done to it', + ); }); it('does not let end the farm after it is ended', () => { @@ -235,24 +140,17 @@ it('does not let end the farm after it is ended', () => { }); farm.end(); - - expect(() => farm.end()).toThrow(); -}); - -it('calls "computeWorkerKey" for each of the calls', () => { - const computeWorkerKey = jest.fn(); - const farm = new Farm('/tmp/baz.js', { - computeWorkerKey, - exposedMethods: ['foo', 'bar'], - numWorkers: 3, - }); - - farm.foo('car', 'plane'); - - expect(computeWorkerKey.mock.calls[0]).toEqual(['foo', 'car', 'plane']); + expect(farm._workerPool.end).toHaveBeenCalledTimes(1); + expect(() => farm.end()).toThrow( + 'Farm is ended, no more calls can be done to it', + ); + expect(() => farm.end()).toThrow( + 'Farm is ended, no more calls can be done to it', + ); + expect(farm._workerPool.end).toHaveBeenCalledTimes(1); }); -it('returns the result if the call worked', async () => { +it('calls doWork', async () => { const farm = new Farm('/tmp/baz.js', { exposedMethods: ['foo', 'bar'], numWorkers: 1, @@ -260,164 +158,15 @@ it('returns the result if the call worked', async () => { const promise = farm.foo('car', 'plane'); - workerReply(0, null, 34); - expect(await promise).toEqual(34); + expect(await promise).toEqual(42); }); -it('throws if the call failed', async () => { +it('calls getStderr and getStdout from worker', async () => { const farm = new Farm('/tmp/baz.js', { exposedMethods: ['foo', 'bar'], numWorkers: 1, }); - const promise = farm.foo('car', 'plane'); - let error = null; - - workerReply(0, new TypeError('Massively broken')); - - try { - await promise; - } catch (err) { - error = err; - } - - expect(error).not.toBe(null); - expect(error).toBeInstanceOf(TypeError); -}); - -it('sends non-sticked tasks to all workers', () => { - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - numWorkers: 3, - }); - - farm.foo('car', 'plane'); - - expect(mockWorkers[0].send).toHaveBeenCalledTimes(1); - expect(mockWorkers[1].send).toHaveBeenCalledTimes(1); - expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); -}); - -it('sends first-time sticked tasks to all workers', () => { - const farm = new Farm('/tmp/baz.js', { - computeWorkerKey: () => '1234567890abcdef', - exposedMethods: ['foo', 'bar'], - numWorkers: 3, - }); - - farm.foo('car', 'plane'); - - expect(mockWorkers[0].send).toHaveBeenCalledTimes(1); - expect(mockWorkers[1].send).toHaveBeenCalledTimes(1); - expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); -}); - -it('checks that once a sticked task finishes, next time is sent to that worker', async () => { - const farm = new Farm('/tmp/baz.js', { - computeWorkerKey: () => '1234567890abcdef', - exposedMethods: ['foo', 'bar'], - numWorkers: 3, - }); - - // Worker 1 successfully replies with "17" as a result. - farm.foo('car', 'plane'); - workerReply(1, null, 17); - - // Note that the stickiness is not created by the method name or the arguments - // it is solely controlled by the provided "computeWorkerKey" method, which in - // the test example always returns the same key, so all calls should be - // redirected to worker 1 (which is the one that resolved the first call). - farm.bar(); - - // The first time, a call with a "1234567890abcdef" hash had never been done - // earlier ("foo" call), so it got queued to all workers. Later, since the one - // that resolved the call was the one in position 1, all subsequent calls are - // only redirected to that worker. - expect(mockWorkers[0].send).toHaveBeenCalledTimes(1); // Only "foo". - expect(mockWorkers[1].send).toHaveBeenCalledTimes(2); // "foo" + "bar". - expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); // Only "foo". -}); - -it('checks that even before a sticked task finishes, next time is sent to that worker', async () => { - const farm = new Farm('/tmp/baz.js', { - computeWorkerKey: () => '1234567890abcdef', - exposedMethods: ['foo', 'bar'], - numWorkers: 3, - }); - - // Call "foo". Not that the worker is sending a start response synchronously. - farm.foo('car', 'plane'); - workerReplyStart(1); - - // Call "bar". Not that the worker is sending a start response synchronously. - farm.bar(); - workerReplyStart(1); - - // The first time, a call with a "1234567890abcdef" hash had never been done - // earlier ("foo" call), so it got queued to all workers. Later, since the one - // that resolved the call was the one in position 1, all subsequent calls are - // only redirected to that worker. - expect(mockWorkers[0].send).toHaveBeenCalledTimes(1); // Only "foo". - expect(mockWorkers[1].send).toHaveBeenCalledTimes(2); // "foo" + "bar". - expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); // Only "foo". -}); - -it('checks that once a non-sticked task finishes, next time is sent to all workers', async () => { - // Note there is no "computeWorkerKey". - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - numWorkers: 3, - }); - - // Worker 1 successfully replies with "17" as a result. - const promise = farm.foo('car', 'plane'); - workerReply(1, null, 17); - await promise; - - farm.bar(); - - // Since "computeWorkerKey" does not return anything, new jobs are sent again to - // all existing workers. - expect(mockWorkers[0].send).toHaveBeenCalledTimes(2); - expect(mockWorkers[1].send).toHaveBeenCalledTimes(2); - expect(mockWorkers[2].send).toHaveBeenCalledTimes(2); -}); - -it('rotates workers when they are idling', async () => { - let order; - let promise; - - // Note there is no "computeWorkerKey". - const farm = new Farm('/tmp/baz.js', { - exposedMethods: ['foo', 'bar'], - numWorkers: 3, - }); - - [0, 1, 2].forEach(i => { - mockWorkers[i].send.mockReset(); - mockWorkers[i].send.mockImplementation(() => order.push(i)); - }); - - // First time, the order is 0, 1, 2. - order = []; - promise = farm.foo('car', 'plane'); - expect(order).toEqual([0, 1, 2]); - - // Worker 1 successfully replies with "17" as a result. - workerReply(1, null, 17); - await promise; - - [0, 1, 2].forEach(i => { - mockWorkers[i].send.mockReset(); - mockWorkers[i].send.mockImplementation(() => order.push(i)); - }); - - // Now, the order is 1, 2, 0 (shifted one). - order = []; - promise = farm.foo('car', 'plane'); - expect(order).toEqual([1, 2, 0]); - - // Worker 1 successfully replies again. - workerReply(1, null, 17); - await promise; + expect(farm.getStderr()('err')).toEqual('err'); + expect(farm.getStdout()('out')).toEqual('out'); }); diff --git a/packages/jest-worker/src/__tests__/process-integration.test.js b/packages/jest-worker/src/__tests__/process-integration.test.js new file mode 100644 index 000000000000..51e565dfd101 --- /dev/null +++ b/packages/jest-worker/src/__tests__/process-integration.test.js @@ -0,0 +1,157 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import EventEmitter from 'events'; + +import {CHILD_MESSAGE_CALL, PARENT_MESSAGE_OK} from '../types'; + +let Farm; +let mockForkedProcesses; + +function mockBuildForkedProcess() { + const mockChild = new EventEmitter(); + + mockChild.send = jest.fn(); + + return mockChild; +} + +function replySuccess(i, result) { + mockForkedProcesses[i].emit('message', [PARENT_MESSAGE_OK, result]); +} + +function assertCallsToChild(childNum, ...calls) { + expect(mockForkedProcesses[childNum].send).toHaveBeenCalledTimes( + calls.length + 1, + ); + + calls.forEach(([methodName, ...args], numCall) => { + expect( + mockForkedProcesses[childNum].send.mock.calls[numCall + 1][0], + ).toEqual([CHILD_MESSAGE_CALL, true, methodName, args]); + }); +} + +jest.mock('worker_threads', () => { + throw Error('Unsupported'); +}); + +describe('Jest Worker Integration', () => { + beforeEach(() => { + mockForkedProcesses = []; + + jest.mock('child_process', () => ({ + fork() { + const forkedProcess = mockBuildForkedProcess(); + + mockForkedProcesses.push(forkedProcess); + + return forkedProcess; + }, + })); + + Farm = require('../index').default; + }); + + afterEach(() => { + jest.resetModules(); + }); + + it('calls a single method from the worker', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + const promise = farm.foo(); + + replySuccess(0, 42); + + expect(await promise).toBe(42); + }); + + it('distributes sequential calls across child processes', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // The first call will go to the first child process. + const promise0 = farm.foo('param-0'); + + assertCallsToChild(0, ['foo', 'param-0']); + replySuccess(0, 'worker-0'); + expect(await promise0).toBe('worker-0'); + + // The second call will go to the second child process. + const promise1 = farm.foo(1); + + assertCallsToChild(1, ['foo', 1]); + replySuccess(1, 'worker-1'); + expect(await promise1).toBe('worker-1'); + }); + + it('distributes concurrent calls across child processes', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // Do 3 calls to the farm in parallel. + const promise0 = farm.foo('param-0'); + const promise1 = farm.foo('param-1'); + const promise2 = farm.foo('param-2'); + + // Check that the method calls are sent to each separate child process. + assertCallsToChild(0, ['foo', 'param-0']); + assertCallsToChild(1, ['foo', 'param-1']); + assertCallsToChild(2, ['foo', 'param-2']); + + // Send different responses from each child. + replySuccess(0, 'worker-0'); + replySuccess(1, 'worker-1'); + replySuccess(2, 'worker-2'); + + // Check + expect(await promise0).toBe('worker-0'); + expect(await promise1).toBe('worker-1'); + expect(await promise2).toBe('worker-2'); + }); + + it('sticks parallel calls to children', async () => { + const farm = new Farm('/tmp/baz.js', { + computeWorkerKey: () => '1234567890abcdef', + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // Do 3 calls to the farm in parallel. + const promise0 = farm.foo('param-0'); + const promise1 = farm.foo('param-1'); + const promise2 = farm.foo('param-2'); + + // Send different responses for each call (from the same child). + replySuccess(0, 'worker-0'); + replySuccess(0, 'worker-1'); + replySuccess(0, 'worker-2'); + + // Check that all the calls have been received by the same child). + assertCallsToChild( + 0, + ['foo', 'param-0'], + ['foo', 'param-1'], + ['foo', 'param-2'], + ); + + // Check that responses are correct. + expect(await promise0).toBe('worker-0'); + expect(await promise1).toBe('worker-1'); + expect(await promise2).toBe('worker-2'); + }); +}); diff --git a/packages/jest-worker/src/__tests__/thread-integration.test.js b/packages/jest-worker/src/__tests__/thread-integration.test.js new file mode 100644 index 000000000000..9c701c362fa1 --- /dev/null +++ b/packages/jest-worker/src/__tests__/thread-integration.test.js @@ -0,0 +1,158 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import EventEmitter from 'events'; + +import {CHILD_MESSAGE_CALL, PARENT_MESSAGE_OK} from '../types'; + +let Farm; +let mockForkedProcesses; + +function mockBuildForkedProcess() { + const mockChild = new EventEmitter(); + + mockChild.postMessage = jest.fn(); + + return mockChild; +} + +function replySuccess(i, result) { + mockForkedProcesses[i].emit('message', [PARENT_MESSAGE_OK, result]); +} + +function assertCallsToChild(childNum, ...calls) { + expect(mockForkedProcesses[childNum].postMessage).toHaveBeenCalledTimes( + calls.length + 1, + ); + + calls.forEach(([methodName, ...args], numCall) => { + expect( + mockForkedProcesses[childNum].postMessage.mock.calls[numCall + 1][0], + ).toEqual([CHILD_MESSAGE_CALL, true, methodName, args]); + }); +} + +describe('Jest Worker Process Integration', () => { + beforeEach(() => { + mockForkedProcesses = []; + + jest.mock('worker_threads', () => { + const fakeClass = jest.fn(() => { + const forkedProcess = mockBuildForkedProcess(); + + mockForkedProcesses.push(forkedProcess); + + return forkedProcess; + }); + + return { + Worker: fakeClass, + __esModule: true, + }; + }); + + Farm = require('../index').default; + }); + + afterEach(() => { + jest.resetModules(); + }); + + it('calls a single method from the worker', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + const promise = farm.foo(); + + replySuccess(0, 42); + + expect(await promise).toBe(42); + }); + + it('distributes sequential calls across child processes', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // The first call will go to the first child process. + const promise0 = farm.foo('param-0'); + + assertCallsToChild(0, ['foo', 'param-0']); + replySuccess(0, 'worker-0'); + expect(await promise0).toBe('worker-0'); + + // The second call will go to the second child process. + const promise1 = farm.foo(1); + + assertCallsToChild(1, ['foo', 1]); + replySuccess(1, 'worker-1'); + expect(await promise1).toBe('worker-1'); + }); + + it('distributes concurrent calls across child processes', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // Do 3 calls to the farm in parallel. + const promise0 = farm.foo('param-0'); + const promise1 = farm.foo('param-1'); + const promise2 = farm.foo('param-2'); + + // Check that the method calls are sent to each separate child process. + assertCallsToChild(0, ['foo', 'param-0']); + assertCallsToChild(1, ['foo', 'param-1']); + assertCallsToChild(2, ['foo', 'param-2']); + + // Send different responses from each child. + replySuccess(0, 'worker-0'); + replySuccess(1, 'worker-1'); + replySuccess(2, 'worker-2'); + + // Check + expect(await promise0).toBe('worker-0'); + expect(await promise1).toBe('worker-1'); + expect(await promise2).toBe('worker-2'); + }); + + it('sticks parallel calls to children', async () => { + const farm = new Farm('/tmp/baz.js', { + computeWorkerKey: () => '1234567890abcdef', + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // Do 3 calls to the farm in parallel. + const promise0 = farm.foo('param-0'); + const promise1 = farm.foo('param-1'); + const promise2 = farm.foo('param-2'); + + // Send different responses for each call (from the same child). + replySuccess(0, 'worker-0'); + replySuccess(0, 'worker-1'); + replySuccess(0, 'worker-2'); + + // Check that all the calls have been received by the same child). + assertCallsToChild( + 0, + ['foo', 'param-0'], + ['foo', 'param-1'], + ['foo', 'param-2'], + ); + + // Check that responses are correct. + expect(await promise0).toBe('worker-0'); + expect(await promise1).toBe('worker-1'); + expect(await promise2).toBe('worker-2'); + }); +}); diff --git a/packages/jest-worker/src/base/BaseWorkerPool.js b/packages/jest-worker/src/base/BaseWorkerPool.js new file mode 100644 index 000000000000..56d9e2dea48e --- /dev/null +++ b/packages/jest-worker/src/base/BaseWorkerPool.js @@ -0,0 +1,101 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import mergeStream from 'merge-stream'; +import path from 'path'; + +import {CHILD_MESSAGE_END} from '../types'; + +import type {Readable} from 'stream'; +import type {WorkerPoolOptions, WorkerOptions, WorkerInterface} from '../types'; + +/* istanbul ignore next */ +const emptyMethod = () => {}; + +export default class BaseWorkerPool { + _stderr: Readable; + _stdout: Readable; + _options: WorkerPoolOptions; + _workers: Array; + + constructor(workerPath: string, options: WorkerPoolOptions) { + this._options = options; + this._workers = new Array(options.numWorkers); + + if (!path.isAbsolute(workerPath)) { + workerPath = require.resolve(workerPath); + } + + const stdout = mergeStream(); + const stderr = mergeStream(); + + const {forkOptions, maxRetries, setupArgs} = options; + + for (let i = 0; i < options.numWorkers; i++) { + const workerOptions: WorkerOptions = { + forkOptions, + maxRetries, + setupArgs, + workerId: i, + workerPath, + }; + + const worker = this.createWorker(workerOptions); + const workerStdout = worker.getStdout(); + const workerStderr = worker.getStderr(); + + if (workerStdout) { + stdout.add(workerStdout); + } + + if (workerStderr) { + stderr.add(workerStderr); + } + + this._workers[i] = worker; + } + + this._stdout = stdout; + this._stderr = stderr; + } + + getStderr(): Readable { + return this._stderr; + } + + getStdout(): Readable { + return this._stdout; + } + + getWorkers(): Array { + return this._workers; + } + + getWorkerById(workerId: number): WorkerInterface { + return this._workers[workerId]; + } + + createWorker(workerOptions: WorkerOptions): WorkerInterface { + throw Error('Missing method createWorker in WorkerPool'); + } + + end(): void { + // We do not cache the request object here. If so, it would only be only + // processed by one of the workers, and we want them all to close. + for (let i = 0; i < this._workers.length; i++) { + this._workers[i].send( + [CHILD_MESSAGE_END, false], + emptyMethod, + emptyMethod, + ); + } + } +} diff --git a/packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js b/packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js new file mode 100644 index 000000000000..ca7297a58d1b --- /dev/null +++ b/packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js @@ -0,0 +1,224 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import {CHILD_MESSAGE_END} from '../../types'; + +import BaseWorkerPool from '../BaseWorkerPool'; + +const Worker = jest.fn(() => ({ + getStderr: () => ({once() {}, pipe() {}}), + getStdout: () => ({once() {}, pipe() {}}), + send: jest.fn(), +})); + +const mockSend = jest.fn(); + +class MockWorkerPool extends BaseWorkerPool { + createWorker(...args) { + return new Worker(...args); + } + send(...args) { + return mockSend(...args); + } +} + +describe('BaseWorkerPool', () => { + beforeEach(() => { + Worker.mockClear(); + }); + + it('throws error when createWorker is not defined', () => { + expect( + () => + new BaseWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 4, + setupArgs: [], + }), + ).toThrow('Missing method createWorker in WorkerPool'); + }); + + it('creates and exposes n workers', () => { + const pool = new MockWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 4, + setupArgs: [], + }); + + expect(pool.getWorkers()).toHaveLength(4); + expect(pool.getWorkerById(0)).toBeDefined(); + expect(pool.getWorkerById(1)).toBeDefined(); + expect(pool.getWorkerById(2)).toBeDefined(); + expect(pool.getWorkerById(3)).toBeDefined(); + }); + + it('ends all workers', () => { + const pool = new MockWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 4, + setupArgs: [], + }); + + const workers = pool.getWorkers(); + pool.end(); + + const endMessage = [CHILD_MESSAGE_END, false]; + expect(workers[0].send.mock.calls[0][0]).toEqual(endMessage); + expect(workers[1].send.mock.calls[0][0]).toEqual(endMessage); + expect(workers[2].send.mock.calls[0][0]).toEqual(endMessage); + expect(workers[3].send.mock.calls[0][0]).toEqual(endMessage); + }); + + it('creates and expoeses n workers', () => { + const pool = new MockWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 4, + setupArgs: [], + }); + + expect(pool.getWorkers()).toHaveLength(4); + expect(pool.getWorkerById(0)).toBeDefined(); + expect(pool.getWorkerById(1)).toBeDefined(); + expect(pool.getWorkerById(2)).toBeDefined(); + expect(pool.getWorkerById(3)).toBeDefined(); + }); + + it('creates workers with the right options', () => { + // eslint-disable-next-line no-new + new MockWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 4, + setupArgs: [{foo: 'bar'}], + }); + + expect(Worker).toHaveBeenCalledTimes(4); + expect(Worker).toHaveBeenNthCalledWith(1, { + forkOptions: {execArgv: []}, + maxRetries: 6, + setupArgs: [{foo: 'bar'}], + workerId: 0, + workerPath: '/tmp/baz.js', + }); + expect(Worker).toHaveBeenNthCalledWith(2, { + forkOptions: {execArgv: []}, + maxRetries: 6, + setupArgs: [{foo: 'bar'}], + workerId: 1, + workerPath: '/tmp/baz.js', + }); + expect(Worker).toHaveBeenNthCalledWith(3, { + forkOptions: {execArgv: []}, + maxRetries: 6, + setupArgs: [{foo: 'bar'}], + workerId: 2, + workerPath: '/tmp/baz.js', + }); + expect(Worker).toHaveBeenNthCalledWith(4, { + forkOptions: {execArgv: []}, + maxRetries: 6, + setupArgs: [{foo: 'bar'}], + workerId: 3, + workerPath: '/tmp/baz.js', + }); + }); + + it('makes a non-existing relative worker throw', () => { + expect(() => { + // eslint-disable-next-line no-new + new MockWorkerPool('./baz.js', { + exposedMethods: [], + numWorkers: 1, + }); + }).toThrow(); + }); + + it('create multiple workers with unique worker ids', () => { + // eslint-disable-next-line no-new + new MockWorkerPool('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 3, + }); + + expect(Worker).toHaveBeenCalledTimes(3); + expect(Worker.mock.calls[0][0].workerId).toEqual(0); + expect(Worker.mock.calls[1][0].workerId).toEqual(1); + expect(Worker.mock.calls[2][0].workerId).toEqual(2); + }); + + it('aggregates all stdouts and stderrs from all workers', () => { + const out = []; + const err = []; + + Worker.mockImplementation(() => ({ + getStderr: () => ({ + once() {}, + pipe(errStream) { + err.push(errStream); + }, + }), + getStdout: () => ({ + once() {}, + pipe(outStream) { + out.push(outStream); + }, + }), + })); + + const farm = new MockWorkerPool('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 2, + }); + + expect(out.length).toBe(2); + expect(err.length).toBe(2); + + const stdout = jest.fn(); + const stderr = jest.fn(); + + farm.getStdout().on('data', stdout); + farm.getStderr().on('data', stderr); + + out[0].write(Buffer.from('hello')); + out[1].write(Buffer.from('bye')); + err[1].write(Buffer.from('house')); + err[0].write(Buffer.from('tree')); + + expect(stdout.mock.calls[0][0].toString()).toBe('hello'); + expect(stdout.mock.calls[1][0].toString()).toBe('bye'); + expect(stderr.mock.calls[0][0].toString()).toBe('house'); + expect(stderr.mock.calls[1][0].toString()).toBe('tree'); + }); + + it('works when stdout and stderr are not piped to the parent', () => { + Worker.mockImplementation(() => ({ + getStderr: () => null, + getStdout: () => null, + send: () => null, + })); + + const farm = new MockWorkerPool('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + forkOptions: { + silent: false, + stdio: 'inherit', + }, + numWorkers: 2, + }); + + expect(() => farm.send()).not.toThrow(); + expect(() => farm.send()).not.toThrow(); + }); +}); diff --git a/packages/jest-worker/src/index.js b/packages/jest-worker/src/index.js index be634f2fecbd..fbaff49e7b02 100644 --- a/packages/jest-worker/src/index.js +++ b/packages/jest-worker/src/index.js @@ -9,18 +9,38 @@ 'use strict'; -import mergeStream from 'merge-stream'; import os from 'os'; -import path from 'path'; - -import type {FarmOptions} from './types'; +import WorkerPool from './WorkerPool'; +import Farm from './Farm'; +import type { + WorkerPoolInterface, + WorkerPoolOptions, + FarmOptions, +} from './types'; import type {Readable} from 'stream'; -import {CHILD_MESSAGE_CALL, CHILD_MESSAGE_END} from './types'; -import Worker from './Worker'; +function getExposedMethods( + workerPath: string, + options: FarmOptions, +): $ReadOnlyArray { + let exposedMethods = options.exposedMethods; + + // If no methods list is given, try getting it by auto-requiring the module. + if (!exposedMethods) { + // $FlowFixMe: This has to be a dynamic require. + const module: Function | Object = require(workerPath); + + exposedMethods = Object.keys(module).filter( + name => typeof module[name] === 'function', + ); + + if (typeof module === 'function') { + exposedMethods.push('default'); + } + } -/* istanbul ignore next */ -const emptyMethod = () => {}; + return exposedMethods; +} /** * The Jest farm (publicly called "Worker") is a class that allows you to queue @@ -29,7 +49,7 @@ const emptyMethod = () => {}; * of the child processes, and bridged to the main process. * * Bridged methods are specified by using the "exposedMethods" property of the - * options "object". This is an array of strings, where each of them corresponds + * "options" object. This is an array of strings, where each of them corresponds * to the exported name in the loaded module. * * You can also control the amount of workers by using the "numWorkers" property @@ -43,72 +63,41 @@ const emptyMethod = () => {}; * * - Sticky method: if a "computeWorkerKey" method is provided within the * config, the resulting string of this method will be used as a key. - * Everytime this key is returned, it is guaranteed that your job will be + * Every time this key is returned, it is guaranteed that your job will be * processed by the same worker. This is specially useful if your workers are * caching results. */ -export default class { - _stdout: Readable; - _stderr: Readable; +export default class JestWorker { _ending: boolean; - _cacheKeys: {[string]: Worker, __proto__: null}; + _farm: Farm; _options: FarmOptions; - _workers: Array; - _offset: number; - - constructor(workerPath: string, options?: FarmOptions = {}) { - const numWorkers = options.numWorkers || Math.max(os.cpus().length - 1, 1); - const workers = new Array(numWorkers); - const stdout = mergeStream(); - const stderr = mergeStream(); + _workerPool: WorkerPoolInterface; - if (!path.isAbsolute(workerPath)) { - workerPath = require.resolve(workerPath); - } + constructor(workerPath: string, options?: FarmOptions) { + this._options = Object.assign({}, options); - const sharedWorkerOptions = { - forkOptions: options.forkOptions || {}, - maxRetries: options.maxRetries || 3, - setupArgs: options.setupArgs || [], - workerPath, + const workerPoolOptions: WorkerPoolOptions = { + forkOptions: this._options.forkOptions || {}, + maxRetries: this._options.maxRetries || 3, + numWorkers: this._options.numWorkers || Math.max(os.cpus().length - 1, 1), + setupArgs: this._options.setupArgs || [], }; - for (let i = 0; i < numWorkers; i++) { - const workerOptions = Object.assign({}, sharedWorkerOptions, { - workerId: i + 1, - }); - const worker = new Worker(workerOptions); - const workerStdout = worker.getStdout(); - const workerStderr = worker.getStderr(); + this._workerPool = this._options.WorkerPool + ? new this._options.WorkerPool(workerPath, workerPoolOptions) + : new WorkerPool(workerPath, workerPoolOptions); - if (workerStdout) { - stdout.add(workerStdout); - } - - if (workerStderr) { - stderr.add(workerStderr); - } + this._farm = new Farm( + workerPoolOptions.numWorkers, + this._workerPool.send.bind(this._workerPool), + this._options.computeWorkerKey, + ); - workers[i] = worker; - } - - let exposedMethods = options.exposedMethods; - - // If no methods list is given, try getting it by auto-requiring the module. - if (!exposedMethods) { - // $FlowFixMe: This has to be a dynamic require. - const child = require(workerPath); - - exposedMethods = Object.keys(child).filter( - name => typeof child[name] === 'function', - ); - - if (typeof child === 'function') { - exposedMethods.push('default'); - } - } + this._bindExposedWorkerMethods(workerPath, this._options); + } - exposedMethods.forEach(name => { + _bindExposedWorkerMethods(workerPath: string, options: FarmOptions): void { + getExposedMethods(workerPath, options).forEach(name => { if (name.startsWith('_')) { return; } @@ -118,95 +107,34 @@ export default class { } // $FlowFixMe: dynamic extension of the class instance is expected. - this[name] = this._makeCall.bind(this, name); + this[name] = this._callFunctionWithArgs.bind(this, name); }); - - this._stdout = stdout; - this._stderr = stderr; - this._ending = false; - this._cacheKeys = Object.create(null); - this._options = options; - this._workers = workers; - this._offset = 0; } - getStdout(): Readable { - return this._stdout; - } - - getStderr(): Readable { - return this._stderr; - } - - end() { + // eslint-disable-next-line no-unclear-flowtypes + _callFunctionWithArgs(method: string, ...args: Array): Promise { if (this._ending) { throw new Error('Farm is ended, no more calls can be done to it'); } - const workers = this._workers; + return this._farm.doWork(method, ...args); + } - // We do not cache the request object here. If so, it would only be only - // processed by one of the workers, and we want them all to close. - for (let i = 0; i < workers.length; i++) { - workers[i].send([CHILD_MESSAGE_END, false], emptyMethod, emptyMethod); - } + getStderr(): Readable { + return this._workerPool.getStderr(); + } - this._ending = true; + getStdout(): Readable { + return this._workerPool.getStdout(); } - // eslint-disable-next-line no-unclear-flowtypes - _makeCall(method: string, ...args: Array): Promise { + end(): void { if (this._ending) { throw new Error('Farm is ended, no more calls can be done to it'); } - return new Promise((resolve, reject) => { - const {computeWorkerKey} = this._options; - const workers = this._workers; - const length = workers.length; - const cacheKeys = this._cacheKeys; - const request = [CHILD_MESSAGE_CALL, false, method, args]; - - let worker = null; - let hash = null; - - if (computeWorkerKey) { - hash = computeWorkerKey.apply(this, [method].concat(args)); - worker = hash == null ? null : cacheKeys[hash]; - } + this._workerPool.end(); - // Do not use a fat arrow since we need the "this" value, which points to - // the worker that executed the call. - const onProcessStart = worker => { - if (hash != null) { - cacheKeys[hash] = worker; - } - }; - - const onProcessEnd = (error, result) => { - if (error) { - reject(error); - } else { - resolve(result); - } - }; - - // If a worker is pre-selected, use it... - if (worker) { - worker.send(request, onProcessStart, onProcessEnd); - return; - } - - // ... otherwise use all workers, so the first one available will pick it. - for (let i = 0; i < length; i++) { - workers[(i + this._offset) % length].send( - request, - onProcessStart, - onProcessEnd, - ); - } - - this._offset++; - }); + this._ending = true; } } diff --git a/packages/jest-worker/src/types.js b/packages/jest-worker/src/types.js index 5d7108f9fb30..30f4e6254dfe 100644 --- a/packages/jest-worker/src/types.js +++ b/packages/jest-worker/src/types.js @@ -29,7 +29,8 @@ export type PARENT_MESSAGE_ERROR = // Option objects. -import type Worker from './Worker'; +import type {Readable} from 'stream'; +const EventEmitter = require('events'); export type ForkOptions = { cwd?: string, @@ -42,6 +43,24 @@ export type ForkOptions = { gid?: number, }; +export interface WorkerPoolInterface { + getStderr(): Readable; + getStdout(): Readable; + getWorkers(): Array; + createWorker(WorkerOptions): WorkerInterface; + send(number, ChildMessage, Function, Function): void; + end(): void; +} + +export interface WorkerInterface { + send(ChildMessage, Function, Function): void; + getWorkerId(): number; + getStderr(): Readable; + getStdout(): Readable; + onExit(number): void; + onMessage(any): void; +} + export type FarmOptions = { computeWorkerKey?: (string, ...Array) => ?string, exposedMethods?: $ReadOnlyArray, @@ -49,8 +68,19 @@ export type FarmOptions = { setupArgs?: Array, maxRetries?: number, numWorkers?: number, + WorkerPool?: ( + workerPath: string, + options?: WorkerPoolOptions, + ) => WorkerPoolInterface, }; +export type WorkerPoolOptions = {| + setupArgs: Array, + forkOptions: ForkOptions, + maxRetries: number, + numWorkers: number, +|}; + export type WorkerOptions = {| forkOptions: ForkOptions, setupArgs: Array, @@ -61,11 +91,22 @@ export type WorkerOptions = {| // Messages passed from the parent to the children. +export type MessagePort = { + ...typeof EventEmitter, + postMessage(any): void, +}; + +export type MessageChannel = { + port1: MessagePort, + port2: MessagePort, +}; + export type ChildMessageInitialize = [ typeof CHILD_MESSAGE_INITIALIZE, // type boolean, // processed string, // file ?Array, // setupArgs + ?MessagePort, // MessagePort ]; export type ChildMessageCall = [ @@ -103,13 +144,12 @@ export type ParentMessageError = [ export type ParentMessage = ParentMessageOk | ParentMessageError; // Queue types. - -export type OnProcessStart = Worker => void; -export type OnProcessEnd = (?Error, ?any) => void; +export type OnStart = WorkerInterface => void; +export type OnEnd = (?Error, ?any) => void; export type QueueChildMessage = {| request: ChildMessage, - onProcessStart: OnProcessStart, - onProcessEnd: OnProcessEnd, - next: ?QueueChildMessage, + onStart: OnStart, + onEnd: OnEnd, + next?: QueueChildMessage, |}; diff --git a/packages/jest-worker/src/Worker.js b/packages/jest-worker/src/workers/ChildProcessWorker.js similarity index 66% rename from packages/jest-worker/src/Worker.js rename to packages/jest-worker/src/workers/ChildProcessWorker.js index ed883b4a566f..ddff2ad348b3 100644 --- a/packages/jest-worker/src/Worker.js +++ b/packages/jest-worker/src/workers/ChildProcessWorker.js @@ -16,18 +16,13 @@ import { PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_SETUP_ERROR, PARENT_MESSAGE_OK, -} from './types'; + WorkerInterface, +} from '../types'; import type {ChildProcess} from 'child_process'; import type {Readable} from 'stream'; -import type { - ChildMessage, - QueueChildMessage, - OnProcessEnd, - OnProcessStart, - WorkerOptions, -} from './types'; +import type {ChildMessage, OnEnd, OnStart, WorkerOptions} from '../types'; /** * This class wraps the child process and provides a nice interface to @@ -47,49 +42,20 @@ import type { * field is changed to "true", so that other workers which might encounter the * same call skip it. */ -export default class { - _busy: boolean; +export default class ChildProcessWorker implements WorkerInterface { _child: ChildProcess; - _last: ?QueueChildMessage; _options: WorkerOptions; - _queue: ?QueueChildMessage; + _onProcessEnd: OnEnd; _retries: number; constructor(options: WorkerOptions) { this._options = options; - this._queue = null; - - this._initialize(); - } - - getStdout(): Readable { - return this._child.stdout; - } - - getStderr(): Readable { - return this._child.stderr; + this.initialize(); } - send( - request: ChildMessage, - onProcessStart: OnProcessStart, - onProcessEnd: OnProcessEnd, - ) { - const item = {next: null, onProcessEnd, onProcessStart, request}; - - if (this._last) { - this._last.next = item; - } else { - this._queue = item; - } - - this._last = item; - this._process(); - } - - _initialize() { + initialize() { const child = childProcess.fork( - require.resolve('./child'), + require.resolve('./processChild'), // $FlowFixMe: Flow does not work well with Object.assign. Object.assign( { @@ -105,8 +71,8 @@ export default class { ), ); - child.on('message', this._receive.bind(this)); - child.on('exit', this._exit.bind(this)); + child.on('message', this.onMessage.bind(this)); + child.on('exit', this.onExit.bind(this)); // $FlowFixMe: wrong "ChildProcess.send" signature. child.send([ @@ -116,9 +82,8 @@ export default class { this._options.setupArgs, ]); - this._retries++; this._child = child; - this._busy = false; + this._retries++; // If we exceeded the amount of retries, we will emulate an error reply // coming from the child. This avoids code duplication related with cleaning @@ -126,7 +91,7 @@ export default class { if (this._retries > this._options.maxRetries) { const error = new Error('Call retries were exceeded'); - this._receive([ + this.onMessage([ PARENT_MESSAGE_CLIENT_ERROR, error.name, error.message, @@ -136,56 +101,12 @@ export default class { } } - _process() { - if (this._busy) { - return; - } - - let item = this._queue; - - // Calls in the queue might have already been processed by another worker, - // so we have to skip them. - while (item && item.request[1]) { - item = item.next; - } - - this._queue = item; - - if (item) { - // Flag the call as processed, so that other workers know that they don't - // have to process it as well. - item.request[1] = true; - - // Tell the parent that this item is starting to be processed. - item.onProcessStart(this); - - this._retries = 0; - this._busy = true; - - // $FlowFixMe: wrong "ChildProcess.send" signature. - this._child.send(item.request); - } else { - this._last = item; - } - } - - _receive(response: any /* Should be ParentMessage */) { - const item = this._queue; - - if (!item) { - throw new TypeError('Unexpected response with an empty queue'); - } - - const onProcessEnd = item.onProcessEnd; - - this._busy = false; - this._process(); - + onMessage(response: any /* Should be ParentMessage */) { let error; switch (response[0]) { case PARENT_MESSAGE_OK: - onProcessEnd(null, response[1]); + this._onProcessEnd(null, response[1]); break; case PARENT_MESSAGE_CLIENT_ERROR: @@ -207,7 +128,7 @@ export default class { } } - onProcessEnd(error, null); + this._onProcessEnd(error, null); break; case PARENT_MESSAGE_SETUP_ERROR: @@ -217,7 +138,7 @@ export default class { error.type = response[1]; error.stack = response[3]; - onProcessEnd(error, null); + this._onProcessEnd(error, null); break; default: @@ -225,9 +146,30 @@ export default class { } } - _exit(exitCode: number) { + onExit(exitCode: number) { if (exitCode !== 0) { - this._initialize(); + this.initialize(); } } + + send(request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd) { + onProcessStart(this); + this._onProcessEnd = onProcessEnd; + + this._retries = 0; + // $FlowFixMe + this._child.send(request); + } + + getWorkerId(): number { + return this._options.workerId; + } + + getStdout(): Readable { + return this._child.stdout; + } + + getStderr(): Readable { + return this._child.stderr; + } } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.js b/packages/jest-worker/src/workers/NodeThreadsWorker.js new file mode 100644 index 000000000000..67e6fbf4b588 --- /dev/null +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.js @@ -0,0 +1,161 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import { + CHILD_MESSAGE_INITIALIZE, + PARENT_MESSAGE_OK, + PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_SETUP_ERROR, +} from '../types'; + +import path from 'path'; + +import type {Readable} from 'stream'; +import type { + ChildMessage, + OnEnd, + OnStart, + WorkerOptions, + WorkerInterface, +} from '../types'; + +// $FlowFixMe: Flow doesn't know about experimental features of Node +const {Worker} = require('worker_threads'); + +export default class ExperimentalWorker implements WorkerInterface { + _worker: Worker; + _options: WorkerOptions; + _onProcessEnd: OnEnd; + _retries: number; + + constructor(options: WorkerOptions) { + this._options = options; + this.initialize(); + } + + initialize() { + this._worker = new Worker(path.resolve(__dirname, './threadChild.js'), { + eval: false, + stderr: true, + stdout: true, + // $FlowFixMe: Flow does not work well with Object.assign. + workerData: Object.assign( + { + cwd: process.cwd(), + env: Object.assign({}, process.env, { + JEST_WORKER_ID: this._options.workerId, + }), + // Suppress --debug / --inspect flags while preserving others (like --harmony). + execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)), + silent: true, + }, + this._options.forkOptions, + ), + }); + + this._worker.on('message', this.onMessage.bind(this)); + this._worker.on('exit', this.onExit.bind(this)); + + this._worker.postMessage([ + CHILD_MESSAGE_INITIALIZE, + false, + this._options.workerPath, + this._options.setupArgs, + ]); + + this._retries++; + + // If we exceeded the amount of retries, we will emulate an error reply + // coming from the child. This avoids code duplication related with cleaning + // the queue, and scheduling the next call. + if (this._retries > this._options.maxRetries) { + const error = new Error('Call retries were exceeded'); + + this.onMessage([ + PARENT_MESSAGE_CLIENT_ERROR, + error.name, + error.message, + error.stack, + {type: 'WorkerError'}, + ]); + } + } + + onMessage(response: any /* Should be ParentMessage */) { + let error; + + switch (response[0]) { + case PARENT_MESSAGE_OK: + this._onProcessEnd(null, response[1]); + break; + + case PARENT_MESSAGE_CLIENT_ERROR: + error = response[4]; + + if (error != null && typeof error === 'object') { + const extra = error; + const NativeCtor = global[response[1]]; + const Ctor = typeof NativeCtor === 'function' ? NativeCtor : Error; + + error = new Ctor(response[2]); + // $FlowFixMe: adding custom properties to errors. + error.type = response[1]; + error.stack = response[3]; + + for (const key in extra) { + // $FlowFixMe: adding custom properties to errors. + error[key] = extra[key]; + } + } + + this._onProcessEnd(error, null); + break; + case PARENT_MESSAGE_SETUP_ERROR: + error = new Error('Error when calling setup: ' + response[2]); + + // $FlowFixMe: adding custom properties to errors. + error.type = response[1]; + error.stack = response[3]; + + this._onProcessEnd(error, null); + break; + default: + throw new TypeError('Unexpected response from worker: ' + response[0]); + } + } + + onExit(exitCode: number) { + if (exitCode !== 0) { + this.initialize(); + } + } + + send(request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd) { + onProcessStart(this); + this._onProcessEnd = onProcessEnd; + + this._retries = 0; + + this._worker.postMessage(request); + } + + getWorkerId(): number { + return this._options.workerId; + } + + getStdout(): Readable { + return this._worker.stdout; + } + + getStderr(): Readable { + return this._worker.stderr; + } +} diff --git a/packages/jest-worker/src/__tests__/Worker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js similarity index 71% rename from packages/jest-worker/src/__tests__/Worker.test.js rename to packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index cad8347df5b9..5e328bb666f0 100644 --- a/packages/jest-worker/src/__tests__/Worker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -16,7 +16,7 @@ import { CHILD_MESSAGE_INITIALIZE, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_OK, -} from '../types'; +} from '../../types'; let Worker; let forkInterface; @@ -38,7 +38,7 @@ beforeEach(() => { return forkInterface; }); - Worker = require('../Worker').default; + Worker = require('../ChildProcessWorker').default; }); afterEach(() => { @@ -47,7 +47,7 @@ afterEach(() => { }); it('passes fork options down to child_process.fork, adding the defaults', () => { - const child = require.resolve('../child'); + const child = require.resolve('../processChild'); process.execArgv = ['--inspect', '-p']; @@ -119,6 +119,7 @@ it('stops initializing the worker after the amount of retries is exceeded', () = expect(childProcess.fork).toHaveBeenCalledTimes(5); expect(onProcessStart).toBeCalledWith(worker); + expect(onProcessEnd).toHaveBeenCalledTimes(1); expect(onProcessEnd.mock.calls[0][0]).toBeInstanceOf(Error); expect(onProcessEnd.mock.calls[0][0].type).toBe('WorkerError'); expect(onProcessEnd.mock.calls[0][1]).toBe(null); @@ -135,30 +136,11 @@ it('provides stdout and stderr fields from the child process', () => { expect(worker.getStderr()).toBe(forkInterface.stderr); }); -it('swtiches the processed flag of a task as soon as it is processed', () => { - const worker = new Worker({ - forkOptions: {}, - maxRetries: 3, - workerPath: '/tmp/foo', - }); - - const request1 = [CHILD_MESSAGE_CALL, false, 'foo', []]; - const request2 = [CHILD_MESSAGE_CALL, false, 'bar', []]; - - worker.send(request1, () => {}, () => {}); - worker.send(request2, () => {}, () => {}); - - // The queue is empty when it got send, so the task is processed. - expect(request1[1]).toBe(true); - - // The previous one is being processed, so that one stays as unprocessed. - expect(request2[1]).toBe(false); -}); - it('sends the task to the child process', () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, + setupArgs: [], workerPath: '/tmp/foo', }); @@ -170,50 +152,6 @@ it('sends the task to the child process', () => { expect(forkInterface.send.mock.calls[1][0]).toEqual(request); }); -it('relates replies to requests, in order', () => { - const worker = new Worker({ - forkOptions: {}, - maxRetries: 3, - workerPath: '/tmp/foo', - }); - - const onProcessStart1 = jest.fn(); - const onProcessEnd1 = jest.fn(); - const request1 = [CHILD_MESSAGE_CALL, false, 'foo', []]; - - const onProcessStart2 = jest.fn(); - const onProcessEnd2 = jest.fn(); - const request2 = [CHILD_MESSAGE_CALL, false, 'bar', []]; - - worker.send(request1, onProcessStart1, onProcessEnd1); - worker.send(request2, onProcessStart2, onProcessEnd2); - - // 2nd call waits on the queue... - expect(request2[1]).toBe(false); - - // then first call replies... - forkInterface.emit('message', [PARENT_MESSAGE_OK, 44]); - - expect(onProcessStart1.mock.calls[0][0]).toBe(worker); - expect(onProcessEnd1.mock.calls[0][0]).toBeFalsy(); - expect(onProcessEnd1.mock.calls[0][1]).toBe(44); - - // which causes the second call to be processed... - expect(request2[1]).toBe(true); - - // and then the second call replies... - forkInterface.emit('message', [ - PARENT_MESSAGE_CLIENT_ERROR, - 'TypeError', - 'foo', - 'TypeError: foo', - {}, - ]); - - expect(onProcessStart2.mock.calls[0][0]).toBe(worker); - expect(onProcessEnd2.mock.calls[0][0].message).toBe('foo'); -}); - it('calls the onProcessStart method synchronously if the queue is empty', () => { const worker = new Worker({ forkOptions: {}, @@ -240,40 +178,6 @@ it('calls the onProcessStart method synchronously if the queue is empty', () => expect(onProcessEnd).toHaveBeenCalledTimes(1); }); -it('calls the onProcessStart method only when the request is starting to be processed', () => { - const worker = new Worker({ - forkOptions: {}, - maxRetries: 3, - workerPath: '/tmp/foo', - }); - - const onProcessStart1 = jest.fn(); - const onProcessEnd1 = jest.fn(); - - const onProcessStart2 = jest.fn(); - const onProcessEnd2 = jest.fn(); - - worker.send( - [CHILD_MESSAGE_CALL, false, 'foo', []], - onProcessStart1, - onProcessEnd1, - ); - worker.send( - [CHILD_MESSAGE_CALL, false, 'bar', []], - onProcessStart2, - onProcessEnd2, - ); - - // Not called yet since the second request is on the queue. - expect(onProcessStart2).not.toHaveBeenCalled(); - - // then first call replies... - forkInterface.emit('message', [PARENT_MESSAGE_OK]); - - // Now it's been called. - expect(onProcessStart2).toHaveBeenCalledTimes(1); -}); - it('creates error instances for known errors', () => { const worker = new Worker({ forkOptions: {}, diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js new file mode 100644 index 000000000000..db4cfaa06a0b --- /dev/null +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -0,0 +1,282 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +/* eslint-disable no-new */ + +import { + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_INITIALIZE, + PARENT_MESSAGE_OK, + PARENT_MESSAGE_CLIENT_ERROR, +} from '../../types'; + +let Worker; +let childProcess; +let originalExecArgv; + +beforeEach(() => { + jest.mock('worker_threads', () => { + const fakeClass = jest.fn(() => { + const EventEmitter = require('events'); + const thread = new EventEmitter(); + thread.postMessage = jest.fn(); + thread.stdout = 'stdout'; + thread.stderr = 'stderr'; + return thread; + }); + + return { + Worker: fakeClass, + }; + }); + originalExecArgv = process.execArgv; + + childProcess = require('worker_threads').Worker; + childProcess.postMessage = jest.fn(); + + Worker = require('../NodeThreadsWorker').default; +}); + +afterEach(() => { + jest.resetModules(); + process.execArgv = originalExecArgv; +}); + +it('passes fork options down to child_process.fork, adding the defaults', () => { + const child = require.resolve('../threadChild'); + + process.execArgv = ['--inspect', '-p']; + + new Worker({ + forkOptions: { + cwd: '/tmp', + execPath: 'hello', + }, + maxRetries: 3, + workerId: process.env.JEST_WORKER_ID, + workerPath: '/tmp/foo/bar/baz.js', + }); + + expect(childProcess.mock.calls[0][0]).toBe(child); + expect(childProcess.mock.calls[0][1]).toEqual({ + eval: false, + stderr: true, + stdout: true, + workerData: { + cwd: '/tmp', // Overridden default option. + env: process.env, // Default option. + execArgv: ['-p'], // Filtered option. + execPath: 'hello', // Added option. + silent: true, // Default option. + }, + }); +}); + +it('passes workerId to the child process and assign it to env.JEST_WORKER_ID', () => { + new Worker({ + forkOptions: {}, + maxRetries: 3, + workerId: 2, + workerPath: '/tmp/foo', + }); + + expect(childProcess.mock.calls[0][1].workerData.env.JEST_WORKER_ID).toEqual( + 2, + ); +}); + +it('initializes the child process with the given workerPath', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + setupArgs: ['foo', 'bar'], + workerPath: '/tmp/foo/bar/baz.js', + }); + + expect(worker._worker.postMessage.mock.calls[0][0]).toEqual([ + CHILD_MESSAGE_INITIALIZE, + false, + '/tmp/foo/bar/baz.js', + ['foo', 'bar'], + ]); +}); + +it('stops initializing the worker after the amount of retries is exceeded', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo/bar/baz.js', + }); + + const request = [CHILD_MESSAGE_CALL, false, 'foo', []]; + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); + + worker.send(request, onProcessStart, onProcessEnd); + + // We fail four times (initial + three retries). + worker._worker.emit('exit'); + worker._worker.emit('exit'); + worker._worker.emit('exit'); + worker._worker.emit('exit'); + + expect(childProcess).toHaveBeenCalledTimes(5); + expect(onProcessStart).toBeCalledWith(worker); + expect(onProcessEnd).toHaveBeenCalledTimes(1); + expect(onProcessEnd.mock.calls[0][0]).toBeInstanceOf(Error); + expect(onProcessEnd.mock.calls[0][0].type).toBe('WorkerError'); + expect(onProcessEnd.mock.calls[0][1]).toBe(null); +}); + +it('provides stdout and stderr fields from the child process', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + expect(worker.getStdout()).toBe('stdout'); + expect(worker.getStderr()).toBe('stderr'); +}); + +it('sends the task to the child process', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + const request = [CHILD_MESSAGE_CALL, false, 'foo', []]; + + worker.send(request, () => {}, () => {}); + + // Skipping call "0" because it corresponds to the "initialize" one. + expect(worker._worker.postMessage.mock.calls[1][0]).toEqual(request); +}); + +it('calls the onProcessStart method synchronously if the queue is empty', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, false, 'foo', []], + onProcessStart, + onProcessEnd, + ); + + // Only onProcessStart has been called + expect(onProcessStart).toHaveBeenCalledTimes(1); + expect(onProcessEnd).not.toHaveBeenCalled(); + + // then first call replies... + worker._worker.emit('message', [PARENT_MESSAGE_OK]); + + expect(onProcessEnd).toHaveBeenCalledTimes(1); +}); + +it('creates error instances for known errors', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + const callback1 = jest.fn(); + const callback2 = jest.fn(); + const callback3 = jest.fn(); + + // Testing a generic ECMAScript error. + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, callback1); + + worker._worker.emit('message', [ + PARENT_MESSAGE_CLIENT_ERROR, + 'TypeError', + 'bar', + 'TypeError: bar', + {}, + ]); + + expect(callback1.mock.calls[0][0]).toBeInstanceOf(TypeError); + expect(callback1.mock.calls[0][0].message).toBe('bar'); + expect(callback1.mock.calls[0][0].type).toBe('TypeError'); + expect(callback1.mock.calls[0][0].stack).toBe('TypeError: bar'); + + // Testing a custom error. + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, callback2); + + worker._worker.emit('message', [ + PARENT_MESSAGE_CLIENT_ERROR, + 'RandomCustomError', + 'bar', + 'RandomCustomError: bar', + {qux: 'extra property'}, + ]); + + expect(callback2.mock.calls[0][0]).toBeInstanceOf(Error); + expect(callback2.mock.calls[0][0].message).toBe('bar'); + expect(callback2.mock.calls[0][0].type).toBe('RandomCustomError'); + expect(callback2.mock.calls[0][0].stack).toBe('RandomCustomError: bar'); + expect(callback2.mock.calls[0][0].qux).toBe('extra property'); + + // Testing a non-object throw. + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, callback3); + + worker._worker.emit('message', [ + PARENT_MESSAGE_CLIENT_ERROR, + 'Number', + null, + null, + 412, + ]); + + expect(callback3.mock.calls[0][0]).toBe(412); +}); + +it('throws when the child process returns a strange message', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, () => {}); + + // Type 27 does not exist. + expect(() => { + worker._worker.emit('message', [27]); + }).toThrow(TypeError); +}); + +it('does not restart the child if it cleanly exited', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + expect(childProcess).toHaveBeenCalledTimes(1); + worker._worker.emit('exit', 0); + expect(childProcess).toHaveBeenCalledTimes(1); +}); + +it('restarts the child when the child process dies', () => { + const worker = new Worker({ + workerPath: '/tmp/foo', + }); + + expect(childProcess).toHaveBeenCalledTimes(1); + worker._worker.emit('exit', 1); + expect(childProcess).toHaveBeenCalledTimes(2); +}); diff --git a/packages/jest-worker/src/__tests__/child.test.js b/packages/jest-worker/src/workers/__tests__/processChild.test.js similarity index 99% rename from packages/jest-worker/src/__tests__/child.test.js rename to packages/jest-worker/src/workers/__tests__/processChild.test.js index ede79ad82858..1e4ac3ffd70e 100644 --- a/packages/jest-worker/src/__tests__/child.test.js +++ b/packages/jest-worker/src/workers/__tests__/processChild.test.js @@ -20,7 +20,7 @@ import { CHILD_MESSAGE_END, PARENT_MESSAGE_OK, PARENT_MESSAGE_CLIENT_ERROR, -} from '../types'; +} from '../../types'; let ended; let mockCount; @@ -106,7 +106,7 @@ beforeEach(() => { process.send = jest.fn(); // Require the child! - require('../child'); + require('../processChild'); }); afterEach(() => { diff --git a/packages/jest-worker/src/workers/__tests__/threadChild.test.js b/packages/jest-worker/src/workers/__tests__/threadChild.test.js new file mode 100644 index 000000000000..b0d3c41cd105 --- /dev/null +++ b/packages/jest-worker/src/workers/__tests__/threadChild.test.js @@ -0,0 +1,407 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +jest.mock('worker_threads', () => { + const EventEmitter = require('events'); + const thread = new EventEmitter(); + thread.postMessage = jest.fn(); + + return { + isMainThread: false, + parentPort: thread, + }; +}); +let thread; + +const mockError = new TypeError('Booo'); +const mockExtendedError = new ReferenceError('Booo extended'); +const uninitializedParam = {}; +const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); + +import { + CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_END, + PARENT_MESSAGE_OK, + PARENT_MESSAGE_CLIENT_ERROR, +} from '../../types'; + +let ended; +let mockCount; +let initializeParm = uninitializedParam; + +beforeEach(() => { + mockCount = 0; + ended = false; + + jest.mock( + '../my-fancy-worker', + () => { + mockCount++; + + return { + fooPromiseThrows() { + return new Promise((resolve, reject) => { + setTimeout(() => reject(mockError), 5); + }); + }, + + fooPromiseWorks() { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(1989), 5); + }); + }, + + fooThrows() { + throw mockError; + }, + + fooThrowsANumber() { + // eslint-disable-next-line no-throw-literal + throw 412; + }, + + fooThrowsAnErrorWithExtraProperties() { + mockExtendedError.baz = 123; + mockExtendedError.qux = 456; + + throw mockExtendedError; + }, + + fooThrowsNull() { + // eslint-disable-next-line no-throw-literal + throw null; + }, + + fooWorks() { + return 1989; + }, + + setup(param) { + initializeParm = param; + }, + + teardown() { + ended = true; + }, + }; + }, + {virtual: true}, + ); + + jest.mock( + '../my-fancy-standalone-worker', + () => jest.fn().mockImplementation(() => 12345), + {virtual: true}, + ); + + // This mock emulates a transpiled Babel module that carries a default export + // that corresponds to a method. + jest.mock( + '../my-fancy-babel-worker', + () => ({ + __esModule: true, + default: jest.fn().mockImplementation(() => 67890), + }), + {virtual: true}, + ); + + thread = require('worker_threads').parentPort; + + process.exit = jest.fn(); + + // Require the child! + require('../threadChild'); +}); + +beforeEach(() => { + process.exit.mockClear(); + thread.postMessage.mockClear(); +}); + +afterEach(() => { + jest.resetModules(); + + thread.removeAllListeners('message'); +}); + +it('lazily requires the file', () => { + expect(mockCount).toBe(0); + + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + expect(mockCount).toBe(0); + expect(initializeParm).toBe(uninitializedParam); // Not called yet. + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooWorks', + [], + ]); + + expect(mockCount).toBe(1); + expect(initializeParm).toBe(undefined); +}); + +it('calls initialize with the correct arguments', () => { + expect(mockCount).toBe(0); + + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ['foo'], // Pass empty initialize params so the initialize method is called. + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooWorks', + [], + ]); + + expect(initializeParm).toBe('foo'); +}); + +it('returns results immediately when function is synchronous', () => { + thread.send = jest.fn(); + + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooWorks', + [], + ]); + + expect(thread.postMessage.mock.calls[0][0]).toEqual([ + PARENT_MESSAGE_OK, + 1989, + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrows', + [], + ]); + + expect(thread.postMessage.mock.calls[1][0]).toEqual([ + PARENT_MESSAGE_CLIENT_ERROR, + 'TypeError', + 'Booo', + mockError.stack, + {}, + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrowsANumber', + [], + ]); + + expect(thread.postMessage.mock.calls[2][0]).toEqual([ + PARENT_MESSAGE_CLIENT_ERROR, + 'Number', + void 0, + void 0, + 412, + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrowsAnErrorWithExtraProperties', + [], + ]); + + expect(thread.postMessage.mock.calls[3][0]).toEqual([ + PARENT_MESSAGE_CLIENT_ERROR, + 'ReferenceError', + 'Booo extended', + mockExtendedError.stack, + {baz: 123, qux: 456}, + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrowsNull', + [], + ]); + + expect(thread.postMessage.mock.calls[4][0][0]).toBe( + PARENT_MESSAGE_CLIENT_ERROR, + ); + expect(thread.postMessage.mock.calls[4][0][1]).toBe('Error'); + expect(thread.postMessage.mock.calls[4][0][2]).toEqual( + '"null" or "undefined" thrown', + ); + + expect(thread.postMessage).toHaveBeenCalledTimes(5); +}); + +it('returns results when it gets resolved if function is asynchronous', async () => { + jest.useRealTimers(); + + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooPromiseWorks', + [], + ]); + + await sleep(10); + + expect(thread.postMessage.mock.calls[0][0]).toEqual([ + PARENT_MESSAGE_OK, + 1989, + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooPromiseThrows', + [], + ]); + + await sleep(10); + + expect(thread.postMessage.mock.calls[1][0]).toEqual([ + PARENT_MESSAGE_CLIENT_ERROR, + 'TypeError', + 'Booo', + mockError.stack, + {}, + ]); + + expect(thread.postMessage).toHaveBeenCalledTimes(2); +}); + +it('calls the main module if the method call is "default"', () => { + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-standalone-worker', + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'default', + [], + ]); + + expect(thread.postMessage.mock.calls[0][0]).toEqual([ + PARENT_MESSAGE_OK, + 12345, + ]); +}); + +it('calls the main export if the method call is "default" and it is a Babel transpiled one', () => { + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-babel-worker', + ]); + + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'default', + [], + ]); + + expect(thread.postMessage.mock.calls[0][0]).toEqual([ + PARENT_MESSAGE_OK, + 67890, + ]); +}); + +it('finishes the process with exit code 0 if requested', () => { + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + thread.emit('message', [ + CHILD_MESSAGE_END, + true, // Not really used here, but for flow type purity. + ]); + + expect(process.exit).toHaveBeenCalledWith(0); +}); + +it('calls the teardown method ', () => { + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + thread.emit('message', [ + CHILD_MESSAGE_END, + true, // Not really used here, but for flow type purity. + ]); + + expect(ended).toBe(true); +}); + +it('throws if an invalid message is detected', () => { + // Type 27 does not exist. + expect(() => { + thread.emit('message', [27]); + }).toThrow(TypeError); +}); + +it('throws if child is not forked', () => { + delete thread.postMessage; + + thread.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + expect(() => { + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooWorks', + [], + ]); + }).toThrow(); + + expect(() => { + thread.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrows', + [], + ]); + }).toThrow(); +}); diff --git a/packages/jest-worker/src/child.js b/packages/jest-worker/src/workers/processChild.js similarity index 91% rename from packages/jest-worker/src/child.js rename to packages/jest-worker/src/workers/processChild.js index 94b02f109bb5..39359b81ec87 100644 --- a/packages/jest-worker/src/child.js +++ b/packages/jest-worker/src/workers/processChild.js @@ -16,9 +16,13 @@ import { PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_SETUP_ERROR, PARENT_MESSAGE_OK, -} from './types'; +} from '../types'; -import type {PARENT_MESSAGE_ERROR} from './types'; +import type { + ChildMessageInitialize, + ChildMessageCall, + PARENT_MESSAGE_ERROR, +} from '../types'; let file = null; let setupArgs: Array = []; @@ -37,15 +41,17 @@ let initialized = false; * If an invalid message is detected, the child will exit (by throwing) with a * non-zero exit code. */ -process.on('message', (request: any /* Should be ChildMessage */) => { +process.on('message', (request: any) => { switch (request[0]) { case CHILD_MESSAGE_INITIALIZE: - file = request[2]; + const init: ChildMessageInitialize = request; + file = init[2]; setupArgs = request[3]; break; case CHILD_MESSAGE_CALL: - execMethod(request[2], request[3]); + const call: ChildMessageCall = request; + execMethod(call[2], call[3]); break; case CHILD_MESSAGE_END: diff --git a/packages/jest-worker/src/workers/threadChild.js b/packages/jest-worker/src/workers/threadChild.js new file mode 100644 index 000000000000..77c7a96fa6d3 --- /dev/null +++ b/packages/jest-worker/src/workers/threadChild.js @@ -0,0 +1,174 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import { + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_END, + CHILD_MESSAGE_INITIALIZE, + PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_SETUP_ERROR, + PARENT_MESSAGE_OK, +} from '../types'; + +import type { + ChildMessageInitialize, + ChildMessageCall, + PARENT_MESSAGE_ERROR, +} from '../types'; + +let file = null; +let setupArgs: Array = []; +let initialized = false; + +/* eslint-disable import/no-unresolved */ +// $FlowFixMe: Flow doesn't support experimental node modules +import {parentPort, isMainThread} from 'worker_threads'; +/* eslint-enable import/no-unresolved */ + +/** + * This file is a small bootstrapper for workers. It sets up the communication + * between the worker and the parent process, interpreting parent messages and + * sending results back. + * + * The file loaded will be lazily initialized the first time any of the workers + * is called. This is done for optimal performance: if the farm is initialized, + * but no call is made to it, child Node processes will be consuming the least + * possible amount of memory. + * + * If an invalid message is detected, the child will exit (by throwing) with a + * non-zero exit code. + */ +parentPort.on('message', (request: any) => { + switch (request[0]) { + case CHILD_MESSAGE_INITIALIZE: + const init: ChildMessageInitialize = request; + file = init[2]; + setupArgs = request[3]; + break; + + case CHILD_MESSAGE_CALL: + const call: ChildMessageCall = request; + execMethod(call[2], call[3]); + break; + + case CHILD_MESSAGE_END: + end(); + break; + + default: + throw new TypeError( + 'Unexpected request from parent process: ' + request[0], + ); + } +}); + +function reportSuccess(result: any) { + if (isMainThread) { + throw new Error('Child can only be used on a forked process'); + } + + parentPort.postMessage([PARENT_MESSAGE_OK, result]); +} + +function reportClientError(error: Error) { + return reportError(error, PARENT_MESSAGE_CLIENT_ERROR); +} + +function reportInitializeError(error: Error) { + return reportError(error, PARENT_MESSAGE_SETUP_ERROR); +} + +function reportError(error: Error, type: PARENT_MESSAGE_ERROR) { + if (isMainThread) { + throw new Error('Child can only be used on a forked process'); + } + + if (error == null) { + error = new Error('"null" or "undefined" thrown'); + } + + parentPort.postMessage([ + type, + error.constructor && error.constructor.name, + error.message, + error.stack, + // $FlowFixMe: this is safe to just inherit from Object. + typeof error === 'object' ? Object.assign({}, error) : error, + ]); +} + +function end(): void { + // $FlowFixMe: This has to be a dynamic require. + const main = require(file); + + if (!main.teardown) { + exitProcess(); + + return; + } + + execFunction(main.teardown, main, [], exitProcess, exitProcess); +} + +function exitProcess(): void { + process.exit(0); +} + +function execMethod(method: string, args: $ReadOnlyArray): void { + // $FlowFixMe: This has to be a dynamic require. + const main = require(file); + + let fn; + + if (method === 'default') { + fn = main.__esModule ? main['default'] : main; + } else { + fn = main[method]; + } + + function execHelper() { + execFunction(fn, main, args, reportSuccess, reportClientError); + } + + if (initialized || !main.setup) { + execHelper(); + + return; + } + + initialized = true; + + execFunction(main.setup, main, setupArgs, execHelper, reportInitializeError); +} + +function execFunction( + fn: (...args: $ReadOnlyArray) => mixed, + ctx: mixed, + args: $ReadOnlyArray, + onResult: (result: mixed) => void, + onError: (error: Error) => void, +): void { + let result; + + try { + result = fn.apply(ctx, args); + } catch (err) { + onError(err); + + return; + } + + if (result && typeof result.then === 'function') { + result.then(onResult, onError); + } else { + onResult(result); + } +}