Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events waiter #36480

Merged
merged 17 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion paddle/fluid/framework/new_executor/event_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@
#include <cstdlib>
#include <mutex>
#include <vector>
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"

namespace paddle {
namespace framework {

void* AlignedMalloc(size_t size, size_t alignment);
void AlignedFree(void* memory_ptr);

class EventCount {
public:
class Waiter;
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
main_program_(main_prog),
global_scope_(global_scope),
stream_analyzer_(place),
async_work_queue_(kHostNumThreads) {
async_work_queue_(kHostNumThreads, &main_thread_blocker_) {
is_build_ = false;

feed_names_ = feed_names;
Expand Down Expand Up @@ -367,7 +367,8 @@ void InterpreterCore::ExecuteInstructionList(
}
}

async_work_queue_.WaitEmpty();
auto event_id = main_thread_blocker_.WaitEvent();
VLOG(3) << "event_id " << event_id;

PADDLE_ENFORCE_EQ(
op_run_number_.load(), vec_instr.size(),
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class InterpreterCore {
InterpreterProfiler dry_run_profiler_;
StreamAnalyzer stream_analyzer_;
EventManager event_manager_;
EventsWaiter main_thread_blocker_;
interpretercore::AsyncWorkQueue async_work_queue_;

InterpreterCoreGarbageCollector gc_;
Expand Down
12 changes: 8 additions & 4 deletions paddle/fluid/framework/new_executor/interpretercore_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
Expand All @@ -53,16 +54,19 @@ using AtomicVectorSizeT = std::vector<std::unique_ptr<std::atomic<size_t>>>;

class AsyncWorkQueue {
public:
explicit AsyncWorkQueue(size_t host_num_threads)
AsyncWorkQueue(size_t host_num_threads, EventsWaiter* waiter)
: host_num_thread_(host_num_threads) {
std::vector<WorkQueueOptions> group_options;
// for execute host Kernel
group_options.emplace_back(/*num_threads*/ host_num_threads,
/*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
// for launch device Kernel
group_options.emplace_back(/*num_threads*/ 1,
/*allow_spinning*/ true, /*track_task*/ true);
/*allow_spinning*/ true,
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
queue_group_ = CreateWorkQueueGroup(group_options);
}

Expand All @@ -71,7 +75,7 @@ class AsyncWorkQueue {
AtomicVectorSizeT& PrepareAtomicVarRef(
const std::vector<VariableMetaInfo>& vec_meta_info);

void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); }
// void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); }

void AddTask(const OpFuncType& op_func_type, std::function<void()> fn) {
queue_group_->AddTask(static_cast<size_t>(op_func_type), std::move(fn));
Expand Down
30 changes: 9 additions & 21 deletions paddle/fluid/framework/new_executor/nonblocking_threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
namespace paddle {
namespace framework {

template <typename Notifier>
class TaskTracker {
public:
TaskTracker() : wait_empty_cv_(1) {}
TaskTracker() = default;

explicit TaskTracker(Notifier& notifier) : notifier_(&notifier) {}

TaskTracker(const TaskTracker&) = delete;

Expand All @@ -33,32 +36,17 @@ class TaskTracker {

void SubCounter() {
if (1 == num_tasks_.fetch_sub(1, std::memory_order_relaxed)) {
wait_empty_cv_.Notify(true);
if (notifier_ != nullptr) {
notifier_->NotifyEvent();
}
}
}

// only one user can wait at any time
void WaitTaskNumToZero() {
bool waiting = false;
if (!wait_empty_.compare_exchange_strong(waiting, true,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
abort();
}
EventCount::Waiter* w = wait_empty_cv_.GetWaiter(0);
wait_empty_cv_.Prewait();
if (num_tasks_.load(std::memory_order_relaxed) == 0) {
wait_empty_cv_.CancelWait();
} else {
wait_empty_cv_.CommitWait(w);
}
wait_empty_.store(false);
}
uint64_t PendingTaskNum() { return num_tasks_.load(); }

private:
alignas(64) std::atomic<uint64_t> num_tasks_{0};
alignas(64) EventCount wait_empty_cv_;
alignas(64) std::atomic<bool> wait_empty_{false};
Notifier* notifier_{nullptr};
};

template <typename Environment>
Expand Down
46 changes: 18 additions & 28 deletions paddle/fluid/framework/new_executor/workqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ namespace paddle {
namespace framework {
namespace {

using TaskTracker = TaskTracker<EventsWaiter::EventNotifier>;

class WorkQueueImpl : public WorkQueue {
public:
explicit WorkQueueImpl(const WorkQueueOptions& options)
: WorkQueue(options), queue_(nullptr), tracker_(nullptr) {
if (options_.track_task) {
explicit WorkQueueImpl(const WorkQueueOptions& options) : WorkQueue(options) {
if (options_.track_task && options.queue_empty_waiter != nullptr) {
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
tracker_ = new (storage) TaskTracker;
TaskTracker* tracker = reinterpret_cast<TaskTracker*>(storage);
auto notifier = options.queue_empty_waiter->RegisterEvent(
kQueueEmptyEvent,
[tracker]() { return tracker->PendingTaskNum() == 0; });
tracker_ = new (storage) TaskTracker(*notifier.get());
}
queue_ = new NonblockingThreadPool(options_.num_threads,
options_.allow_spinning);
Expand All @@ -44,20 +49,11 @@ class WorkQueueImpl : public WorkQueue {
queue_->AddTask(std::move(fn));
}

void WaitQueueEmpty() override {
if (tracker_ == nullptr) {
PADDLE_THROW(
platform::errors::Unavailable("set WorkQueueOptions.track_task = "
"true before call this interface."));
}
tracker_->WaitTaskNumToZero();
}

size_t NumThreads() const override { return queue_->NumThreads(); }

private:
NonblockingThreadPool* queue_;
TaskTracker* tracker_;
NonblockingThreadPool* queue_{nullptr};
TaskTracker* tracker_{nullptr};
};

class WorkQueueGroupImpl : public WorkQueueGroup {
Expand All @@ -69,8 +65,6 @@ class WorkQueueGroupImpl : public WorkQueueGroup {

void AddTask(size_t queue_idx, std::function<void()> fn) override;

void WaitQueueGroupEmpty() override;

size_t QueueNumThreads(size_t queue_idx) const override;

size_t QueueGroupNumThreads() const override;
Expand All @@ -92,9 +86,14 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
queues_storage_ = reinterpret_cast<NonblockingThreadPool*>(buffer);
for (size_t idx = 0; idx < num_queues; ++idx) {
const auto& options = queues_options_[idx];
if (options.track_task && tracker_ == nullptr) {
if (options.track_task && tracker_ == nullptr &&
options.queue_empty_waiter != nullptr) {
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
tracker_ = new (storage) TaskTracker;
TaskTracker* tracker = reinterpret_cast<TaskTracker*>(storage);
auto notifier = options.queue_empty_waiter->RegisterEvent(
kQueueEmptyEvent,
[tracker]() { return tracker->PendingTaskNum() == 0; });
tracker_ = new (storage) TaskTracker(*notifier.get());
}
queues_[idx] = new (&queues_storage_[idx])
NonblockingThreadPool(options.num_threads, options.allow_spinning);
Expand Down Expand Up @@ -124,15 +123,6 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
queues_[queue_idx]->AddTask(std::move(fn));
}

void WorkQueueGroupImpl::WaitQueueGroupEmpty() {
if (nullptr == tracker_) {
PADDLE_THROW(platform::errors::Unavailable(
"set WorkQueueOptions.track_task = true for at least one of queues "
"before call this interface."));
}
tracker_->WaitTaskNumToZero();
}

size_t WorkQueueGroupImpl::QueueNumThreads(size_t queue_idx) const {
assert(queue_idx < queues_.size());
return queues_.at(queue_idx)->NumThreads();
Expand Down
25 changes: 19 additions & 6 deletions paddle/fluid/framework/new_executor/workqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,30 @@
namespace paddle {
namespace framework {

constexpr const char* kQueueEmptyEvent = "QueueEmpty";
class EventsWaiter;

struct WorkQueueOptions {
WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task)
: num_threads(num_threads),
allow_spinning(allow_spinning),
track_task(track_task) {}

WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task,
EventsWaiter* waiter)
: num_threads(num_threads),
allow_spinning(allow_spinning),
track_task(track_task),
queue_empty_waiter(waiter) {}

size_t num_threads;
bool allow_spinning;
// If you need to blocking the calling thread to wait "queue empty", set
// track_task = true and set queue_empty_waiter. EventsWaiter::WaitEvent will
// block the calling thread until any of events (including "queue empty")
// occured.
bool track_task;
EventsWaiter* queue_empty_waiter{nullptr}; // not owned
};

class WorkQueue {
Expand All @@ -44,9 +59,8 @@ class WorkQueue {

virtual void AddTask(std::function<void()> fn) = 0;

// set WorkQueueOptions.track_task = true before call this
// interface, otherwise will abort()
virtual void WaitQueueEmpty() = 0;
// See WorkQueueOptions.track_task for details
// virtual void WaitQueueEmpty() = 0;

virtual size_t NumThreads() const = 0;

Expand All @@ -67,9 +81,8 @@ class WorkQueueGroup {

virtual void AddTask(size_t queue_idx, std::function<void()> fn) = 0;

// set WorkQueueOptions.track_task = true for at least one of queues
// before call this interface, otherwise will abort()
virtual void WaitQueueGroupEmpty() = 0;
// See WorkQueueOptions.track_task for details
// virtual void WaitQueueGroupEmpty() = 0;

virtual size_t QueueNumThreads(size_t queue_idx) const = 0;

Expand Down
23 changes: 15 additions & 8 deletions paddle/fluid/framework/new_executor/workqueue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@
#include <atomic>
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"

TEST(WorkQueue, TestSingleThreadedWorkQueue) {
VLOG(1) << "In Test";
using paddle::framework::WorkQueueOptions;
using paddle::framework::WorkQueue;
using paddle::framework::CreateSingleThreadedWorkQueue;
using paddle::framework::EventsWaiter;
std::atomic<bool> finished{false};
std::atomic<unsigned> counter{0};
constexpr unsigned kLoopNum = 1000000;
// CreateSingleThreadedWorkQueue
EventsWaiter events_waiter;
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, &events_waiter);
auto work_queue = CreateSingleThreadedWorkQueue(options);
// NumThreads
EXPECT_EQ(work_queue->NumThreads(), 1u);
Expand All @@ -42,7 +45,7 @@ TEST(WorkQueue, TestSingleThreadedWorkQueue) {
});
// WaitQueueEmpty
EXPECT_EQ(finished.load(), false);
work_queue->WaitQueueEmpty();
events_waiter.WaitEvent();
EXPECT_EQ(finished.load(), true);
EXPECT_EQ(counter.load(), kLoopNum);
}
Expand All @@ -52,13 +55,15 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) {
using paddle::framework::WorkQueueOptions;
using paddle::framework::WorkQueue;
using paddle::framework::CreateMultiThreadedWorkQueue;
using paddle::framework::EventsWaiter;
std::atomic<bool> finished{false};
std::atomic<unsigned> counter{0};
constexpr unsigned kExternalLoopNum = 100;
constexpr unsigned kLoopNum = 1000000;
// CreateMultiThreadedWorkQueue
EventsWaiter events_waiter;
WorkQueueOptions options(/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, &events_waiter);
auto work_queue = CreateMultiThreadedWorkQueue(options);
// NumThreads
EXPECT_EQ(work_queue->NumThreads(), 10u);
Expand All @@ -75,7 +80,7 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) {
}
// WaitQueueEmpty
EXPECT_EQ(finished.load(), false);
work_queue->WaitQueueEmpty();
events_waiter.WaitEvent();
EXPECT_EQ(finished.load(), true);
EXPECT_EQ(counter.load(), kLoopNum * kExternalLoopNum);
}
Expand All @@ -84,15 +89,17 @@ TEST(WorkQueue, TestWorkQueueGroup) {
using paddle::framework::WorkQueueOptions;
using paddle::framework::WorkQueueGroup;
using paddle::framework::CreateWorkQueueGroup;
using paddle::framework::EventsWaiter;
std::atomic<bool> finished{false};
std::atomic<unsigned> counter{0};
constexpr unsigned kExternalLoopNum = 100;
constexpr unsigned kLoopNum = 1000000;
// CreateMultiThreadedWorkQueue
// ThreadedWorkQueueGroup
EventsWaiter events_waiter;
WorkQueueOptions sq_options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, &events_waiter);
WorkQueueOptions mq_options(/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, &events_waiter);
auto queue_group = CreateWorkQueueGroup({sq_options, mq_options});
// NumThreads
EXPECT_EQ(queue_group->QueueNumThreads(0), 1u);
Expand All @@ -113,6 +120,6 @@ TEST(WorkQueue, TestWorkQueueGroup) {
}
});
// WaitQueueGroupEmpty()
queue_group->WaitQueueGroupEmpty();
events_waiter.WaitEvent();
EXPECT_EQ(counter.load(), kLoopNum * kExternalLoopNum + kLoopNum);
}
Loading