From 4314dbfce9639ddef9947b21b33fe704aecd4215 Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Tue, 12 Mar 2019 00:42:31 +0800 Subject: [PATCH] worker: create per-Environment message port after bootstrap PR-URL: /~https://github.com/nodejs/node/pull/26593 Reviewed-By: Gus Caplan Reviewed-By: James M Snell --- lib/internal/bootstrap/node.js | 2 +- lib/internal/process/worker_thread_only.js | 29 ++++++++------------ lib/internal/worker/io.js | 18 ++++++++++-- src/node_worker.cc | 32 +++++++++++----------- src/node_worker.h | 2 +- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js index 44996688a5a9fd..d9ad263ca0b46f 100644 --- a/lib/internal/bootstrap/node.js +++ b/lib/internal/bootstrap/node.js @@ -146,7 +146,7 @@ if (isMainThread) { setupProcessStdio(getStdout, getStdin, getStderr); } else { const { getStdout, getStdin, getStderr } = - workerThreadSetup.initializeWorkerStdio(); + workerThreadSetup.createStdioGetters(); setupProcessStdio(getStdout, getStdin, getStderr); } diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js index 2cc52cbf01b8cd..b6529e43441679 100644 --- a/lib/internal/process/worker_thread_only.js +++ b/lib/internal/process/worker_thread_only.js @@ -2,32 +2,25 @@ // This file contains process bootstrappers that can only be // run in the worker thread. -const { - getEnvMessagePort -} = internalBinding('worker'); const { - kWaitingStreams, - ReadableWorkerStdio, - WritableWorkerStdio + createWorkerStdio } = require('internal/worker/io'); const { codes: { ERR_WORKER_UNSUPPORTED_OPERATION } } = require('internal/errors'); -const workerStdio = {}; - -function initializeWorkerStdio() { - const port = getEnvMessagePort(); - port[kWaitingStreams] = 0; - workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin'); - workerStdio.stdout = new WritableWorkerStdio(port, 'stdout'); - workerStdio.stderr = new WritableWorkerStdio(port, 'stderr'); +let workerStdio; +function lazyWorkerStdio() { + if (!workerStdio) workerStdio = createWorkerStdio(); + return workerStdio; +} +function createStdioGetters() { return { - getStdout() { return workerStdio.stdout; }, - getStderr() { return workerStdio.stderr; }, - getStdin() { return workerStdio.stdin; } + getStdout() { return lazyWorkerStdio().stdout; }, + getStderr() { return lazyWorkerStdio().stderr; }, + getStdin() { return lazyWorkerStdio().stdin; } }; } @@ -55,7 +48,7 @@ function unavailable(name) { } module.exports = { - initializeWorkerStdio, + createStdioGetters, unavailable, wrapProcessMethods }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 2f1352fdf910b8..664055b5c5b9af 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -11,7 +11,10 @@ const { moveMessagePortToContext, stopMessagePort } = internalBinding('messaging'); -const { threadId } = internalBinding('worker'); +const { + threadId, + getEnvMessagePort +} = internalBinding('worker'); const { Readable, Writable } = require('stream'); const EventEmitter = require('events'); @@ -227,6 +230,16 @@ class WritableWorkerStdio extends Writable { } } +function createWorkerStdio() { + const port = getEnvMessagePort(); + port[kWaitingStreams] = 0; + return { + stdin: new ReadableWorkerStdio(port, 'stdin'), + stdout: new WritableWorkerStdio(port, 'stdout'), + stderr: new WritableWorkerStdio(port, 'stderr') + }; +} + module.exports = { drainMessagePort, messageTypes, @@ -239,5 +252,6 @@ module.exports = { MessageChannel, setupPortReferencing, ReadableWorkerStdio, - WritableWorkerStdio + WritableWorkerStdio, + createWorkerStdio }; diff --git a/src/node_worker.cc b/src/node_worker.cc index 24bd76c1ff400b..f4c8e402cbb10c 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -264,22 +264,6 @@ void Worker::Run() { Debug(this, "Created Environment for worker with id %llu", thread_id_); if (is_stopped()) return; { - HandleScope handle_scope(isolate_); - Mutex::ScopedLock lock(mutex_); - // Set up the message channel for receiving messages in the child. - child_port_ = MessagePort::New(env_.get(), - env_->context(), - std::move(child_port_data_)); - // MessagePort::New() may return nullptr if execution is terminated - // within it. - if (child_port_ != nullptr) - env_->set_message_port(child_port_->object(isolate_)); - - Debug(this, "Created message port for worker %llu", thread_id_); - } - - if (is_stopped()) return; - { #if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR StartWorkerInspector(env_.get(), std::move(inspector_parent_handle_), @@ -291,6 +275,9 @@ void Worker::Run() { AsyncCallbackScope callback_scope(env_.get()); env_->async_hooks()->push_async_ids(1, 0); if (!RunBootstrapping(env_.get()).IsEmpty()) { + CreateEnvMessagePort(env_.get()); + if (is_stopped()) return; + Debug(this, "Created message port for worker %llu", thread_id_); USE(StartExecution(env_.get(), "internal/main/worker_thread")); } @@ -343,6 +330,19 @@ void Worker::Run() { Debug(this, "Worker %llu thread stops", thread_id_); } +void Worker::CreateEnvMessagePort(Environment* env) { + HandleScope handle_scope(isolate_); + Mutex::ScopedLock lock(mutex_); + // Set up the message channel for receiving messages in the child. + child_port_ = MessagePort::New(env, + env->context(), + std::move(child_port_data_)); + // MessagePort::New() may return nullptr if execution is terminated + // within it. + if (child_port_ != nullptr) + env->set_message_port(child_port_->object(isolate_)); +} + void Worker::JoinThread() { if (thread_joined_) return; diff --git a/src/node_worker.h b/src/node_worker.h index 43b92ed40ed8bb..b23ab704b0e0af 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -50,7 +50,7 @@ class Worker : public AsyncWrap { private: void OnThreadStopped(); - + void CreateEnvMessagePort(Environment* env); const std::string url_; std::shared_ptr per_isolate_opts_;