From 4da4a9a2b79f01fe31b3414a93d8d9c56a289f9b Mon Sep 17 00:00:00 2001 From: wkcn Date: Fri, 8 Mar 2019 11:37:41 +0800 Subject: [PATCH 1/3] Support multi-threading for Custom Operator --- docs/faq/env_var.md | 3 +++ src/operator/custom/custom-inl.h | 34 ++++++++++++++++++-------------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/docs/faq/env_var.md b/docs/faq/env_var.md index 095c214e66b3..5eda879da460 100644 --- a/docs/faq/env_var.md +++ b/docs/faq/env_var.md @@ -60,6 +60,9 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0 * MXNET_MP_OPENCV_NUM_THREADS - Values: Int ```(default=0)``` - The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows). +* MXNET_CUSTOM_OP_NUM_THREADS + - Values: Int ```(default=4)``` + - The number of threads given to custom operators. ## Memory Options diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index de82f3e8eef0..4c77704dc3c1 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -139,7 +139,8 @@ class CustomOperator { destructing_ = true; cv_.notify_all(); } - worker_.join(); + for (auto &worker : workers_) + worker.join(); } static CustomOperator* Get(); @@ -150,27 +151,30 @@ class CustomOperator { naive_engine_ = true; if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) { naive_engine_ = false; - worker_ = std::thread( - [&]() { - std::unique_lock lock(mutex_); - while (!q_.empty() || !destructing_) { - cv_.wait(lock, [&] {return !q_.empty() || destructing_;}); - while (!q_.empty()) { - auto fn = q_.front(); - lock.unlock(); - fn(); - lock.lock(); - q_.pop(); + const int num_threads = dmlc::GetEnv("MXNET_CUSTOM_OP_NUM_THREADS", 4); + for (int i = 0; i < num_threads; ++i) { + workers_.emplace_back(std::thread( + [&]() { + std::unique_lock lock(mutex_); + while (!q_.empty() || !destructing_) { + cv_.wait(lock, [&] {return !q_.empty() || destructing_;}); + while (!q_.empty()) { + auto fn = q_.front(); + q_.pop(); + lock.unlock(); + fn(); + lock.lock(); + } } - } - }); + })); + } } } std::mutex mutex_; std::map registry_; // async worker std::condition_variable cv_; - std::thread worker_; + std::vector workers_; std::queue > q_; bool naive_engine_; bool destructing_; From dff6c5a51e5cf066833f0969e6c886e452e37c3e Mon Sep 17 00:00:00 2001 From: wkcn Date: Fri, 8 Mar 2019 12:32:12 +0800 Subject: [PATCH 2/3] update --- docs/faq/env_var.md | 4 +-- src/operator/custom/custom-inl.h | 47 ++++++++++++++++++++------------ 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/docs/faq/env_var.md b/docs/faq/env_var.md index 5eda879da460..2768f644c066 100644 --- a/docs/faq/env_var.md +++ b/docs/faq/env_var.md @@ -61,8 +61,8 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0 - Values: Int ```(default=0)``` - The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows). * MXNET_CUSTOM_OP_NUM_THREADS - - Values: Int ```(default=4)``` - - The number of threads given to custom operators. + - Values: Int ```(default=16)``` + - The maximum number of threads given to custom operators. ## Memory Options diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index 4c77704dc3c1..e4ed753d9942 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -129,6 +129,9 @@ class CustomOperator { ctx.run_ctx.ctx, vars, vars2, FnProperty::kNormal, 0, "CustomOperator"); }); + // increase num_threads if there is not enough threads to execute custom operator + if (q_.size() > num_free_threads) + CreateThreads(q_.size() - num_free_threads); cv_.notify_all(); } @@ -146,35 +149,45 @@ class CustomOperator { static CustomOperator* Get(); private: - CustomOperator() { + CustomOperator() : num_free_threads(0) { destructing_ = false; naive_engine_ = true; if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) { naive_engine_ = false; - const int num_threads = dmlc::GetEnv("MXNET_CUSTOM_OP_NUM_THREADS", 4); - for (int i = 0; i < num_threads; ++i) { - workers_.emplace_back(std::thread( - [&]() { - std::unique_lock lock(mutex_); - while (!q_.empty() || !destructing_) { - cv_.wait(lock, [&] {return !q_.empty() || destructing_;}); - while (!q_.empty()) { - auto fn = q_.front(); - q_.pop(); - lock.unlock(); - fn(); - lock.lock(); - } - } - })); + SetNumThreads(1); + } + } + void ThreadTarget() { + std::unique_lock lock(mutex_); + while (!q_.empty() || !destructing_) { + cv_.wait(lock, [&] {return !q_.empty() || destructing_;}); + while (!q_.empty()) { + --num_free_threads; + auto fn = q_.front(); + q_.pop(); + lock.unlock(); + fn(); + ++num_free_threads; + lock.lock(); } } } + void SetNumThreads(int num_threads) { + num_threads = std::min(dmlc::GetEnv("MXNET_CUSTOM_OP_NUM_THREADS", 16), num_threads); + for (int i = workers_.size(); i < num_threads; ++i) { + workers_.emplace_back(std::thread([this]{this->ThreadTarget();})); + ++num_free_threads; + } + } + void CreateThreads(int num_new_threads) { + SetNumThreads(workers_.size() + num_new_threads); + } std::mutex mutex_; std::map registry_; // async worker std::condition_variable cv_; std::vector workers_; + std::atomic num_free_threads; std::queue > q_; bool naive_engine_; bool destructing_; From f6fd6ca3b54c9d28b09556716820c902a5ffa769 Mon Sep 17 00:00:00 2001 From: JackieWu Date: Fri, 8 Mar 2019 13:35:55 +0800 Subject: [PATCH 3/3] Update custom-inl.h --- src/operator/custom/custom-inl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index e4ed753d9942..f88e830bc573 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -154,7 +155,6 @@ class CustomOperator { naive_engine_ = true; if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) { naive_engine_ = false; - SetNumThreads(1); } } void ThreadTarget() {