Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events waiter #40876

Merged
merged 88 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
8744ba7
add align for WorkQueue
liutiexing Sep 22, 2021
4759bc8
Merge branch 'develop' of /~https://github.com/liutiexing/Paddle into d…
liutiexing Sep 22, 2021
6f00ace
add spinlock
liutiexing Sep 23, 2021
2d6f1cf
merge develop
liutiexing Sep 23, 2021
f5099be
Merge branch 'develop' of /~https://github.com/liutiexing/Paddle into d…
liutiexing Sep 26, 2021
54aa332
merge develop
liutiexing Oct 12, 2021
1d1bd82
merge
liutiexing Oct 12, 2021
dfbf3e4
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 12, 2021
a5392b3
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 14, 2021
e206173
Add EventsWaiter
liutiexing Oct 15, 2021
45422f8
Add EventsWaiter
liutiexing Oct 15, 2021
c446655
update
liutiexing Oct 15, 2021
c145502
Merge remote-tracking branch 'upstream/develop' into EventsWaiter
liutiexing Oct 15, 2021
0a3dcd9
Revert "Add EventsWaiter"
liutiexing Oct 15, 2021
4689bb5
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 15, 2021
46f4baa
update
liutiexing Oct 16, 2021
bf8c171
Merge remote-tracking branch 'upstream/develop' into EventsWaiter
liutiexing Oct 16, 2021
843844f
update Error MSG
liutiexing Oct 16, 2021
0cec99a
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 20, 2021
bf88cd7
Merge remote-tracking branch 'upstream/develop' into EventsWaiter
liutiexing Oct 21, 2021
1c2e10d
update EventsWaiter
liutiexing Oct 21, 2021
481c4fa
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Oct 27, 2021
83db84e
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Oct 29, 2021
7010e0d
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 16, 2021
ec2a363
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 23, 2021
90a59ec
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 26, 2021
1445bbe
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 29, 2021
a2c74ab
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 1, 2021
1c09b4e
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 2, 2021
cb8cf7d
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 8, 2021
cf0dcd6
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 8, 2021
2f95801
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 14, 2021
14bec1b
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 15, 2021
8a5f7af
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 16, 2021
f0a5915
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 20, 2021
0fe35aa
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 21, 2021
f65eef2
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 23, 2021
b37e42d
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 28, 2021
cf5e240
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 29, 2021
b31869a
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 30, 2021
fab2911
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 4, 2022
16b0903
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 6, 2022
074fea5
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 7, 2022
8f4a51c
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 8, 2022
09036ff
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 10, 2022
0e6a94f
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 13, 2022
d2293fd
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 14, 2022
b529801
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 17, 2022
ff55840
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 17, 2022
52684e7
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 21, 2022
e806789
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 22, 2022
e59a3f8
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 24, 2022
8fa5e17
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 25, 2022
5c8ffbd
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 25, 2022
e5586e9
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 26, 2022
e5731a4
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Jan 28, 2022
67cd2a6
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 10, 2022
df6298b
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 11, 2022
7edaab6
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 11, 2022
bdec640
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 11, 2022
7c0736c
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 14, 2022
fc9f166
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 16, 2022
91a3729
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 17, 2022
dd1ad2c
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 19, 2022
c5afd93
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 19, 2022
0d45241
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 21, 2022
696f4c7
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 21, 2022
0b14de3
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 22, 2022
92e1b3d
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 22, 2022
db7a3c6
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 23, 2022
9f91993
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 23, 2022
116a1f3
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 24, 2022
88f6444
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 25, 2022
bdb927e
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 25, 2022
71e2bda
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 26, 2022
bc3edb6
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 27, 2022
3735f63
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Feb 28, 2022
9e2ffcf
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 1, 2022
581de82
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 2, 2022
5a3d862
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 3, 2022
01e6716
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 11, 2022
3e6a64c
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 16, 2022
b03f605
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 21, 2022
e4754e1
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 21, 2022
8146e43
merge develop
Mar 22, 2022
565773a
update
Mar 23, 2022
5b758c2
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Mar 23, 2022
2780d08
merge develop
Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ InterpreterCore::~InterpreterCore() {
// cancle gc's thread
gc_.reset(nullptr);

exception_notifier_->UnregisterEvent();
completion_notifier_->UnregisterEvent();

async_work_queue_.reset(nullptr);
}

Expand Down
173 changes: 117 additions & 56 deletions paddle/fluid/framework/new_executor/workqueue/events_waiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,79 @@
namespace paddle {
namespace framework {

constexpr EventsWaiter::EventId kEmptyEventId = 0;

EventsWaiter::EventsWaiter()
: trigger_event_(nullptr), counter_(0), waiting_(false), cv_(1) {}
: trigger_event_(kEmptyEventId),
counter_(0),
eof_(true),
waiting_(false),
cv_(1) {}

std::shared_ptr<EventsWaiter::EventNotifier> EventsWaiter::RegisterEvent(
const std::string& name, EventChecker checker) {
auto counter = counter_.fetch_add(1);
auto id = std::hash<std::string>()(name + std::to_string(counter));
EventId id = kEmptyEventId;
EventInfo* evt = nullptr;
do {
auto counter = counter_.fetch_add(1);
id = std::hash<std::string>()(name + std::to_string(counter));
if (id == kEmptyEventId) {
continue;
}
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
if (events_.count(id) > 0) {
continue;
}
evt = &(events_[id]);
} while (evt == nullptr);
evt->id = id;
evt->name = name;
evt->type = TriggerType::LevelTriggered;
evt->checker = std::move(checker);
eof_.store(false, std::memory_order_relaxed);
VLOG(10) << "Register event id:" << id << " name:" << name;
auto notifier = std::shared_ptr<EventNotifier>(new EventNotifier(id, this));
EventInfo evt{id, name, TriggerType::LevelTriggered, std::move(checker)};
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_[id] = std::move(evt);
return notifier;
}

std::shared_ptr<EventsWaiter::EventNotifier> EventsWaiter::RegisterEvent(
const std::string& name) {
auto counter = counter_.fetch_add(1);
auto id = std::hash<std::string>()(name + std::to_string(counter));
EventId id = kEmptyEventId;
EventInfo* evt = nullptr;
do {
auto counter = counter_.fetch_add(1);
id = std::hash<std::string>()(name + std::to_string(counter));
if (id == kEmptyEventId) {
continue;
}
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
if (events_.count(id) > 0) {
continue;
}
evt = &(events_[id]);
} while (evt == nullptr);
evt->id = id;
evt->name = name;
evt->type = TriggerType::EdgeTriggered;
evt->checker = []() { return false; };
eof_.store(false, std::memory_order_relaxed);
VLOG(10) << "Register event id:" << id << " name:" << name;
auto notifier = std::shared_ptr<EventNotifier>(new EventNotifier(id, this));
EventInfo evt{id, name, TriggerType::EdgeTriggered, []() { return false; }};
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_[id] = std::move(evt);
return notifier;
}

void EventsWaiter::UnregisterEvent(const EventId& id) {
VLOG(10) << "Unregister event id:" << id;
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_.erase(id);
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
deleted_events_.insert(id);
if (deleted_events_.size() == events_.size()) {
eof_.store(true, std::memory_order_relaxed);
}
}
if (eof_.load(std::memory_order_relaxed)) {
cv_.Notify(true);
}
}

std::string EventsWaiter::WaitEvent() {
Expand All @@ -61,42 +103,60 @@ std::string EventsWaiter::WaitEvent() {
PADDLE_THROW(
platform::errors::ResourceExhausted("Another thread is waiting."));
}

auto w = cv_.GetWaiter(0);
cv_.Prewait();
std::string* triggered = trigger_event_;
if (triggered == nullptr) {
EventId triggered = trigger_event_;
while (triggered == kEmptyEventId && !eof_) {
cv_.Prewait();

// double check
triggered = trigger_event_;
// checkers
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
for (auto& kv : events_) {
auto& evt = kv.second;
if (TriggerType::LevelTriggered == evt.type && evt.checker()) {
triggered = new std::string(evt.name);
break;
if (triggered == kEmptyEventId) {
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
for (auto& kv : events_) {
auto& evt = kv.second;
if (TriggerType::LevelTriggered == evt.type && evt.checker()) {
triggered = evt.id;
break;
}
}
}
}
if (triggered != nullptr) {
std::string* prev = nullptr;
if (!trigger_event_.compare_exchange_strong(prev, triggered,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
delete triggered;
triggered = prev;
if (triggered != kEmptyEventId) {
EventId prev = kEmptyEventId;
if (!trigger_event_.compare_exchange_strong(
prev, triggered, std::memory_order_seq_cst,
std::memory_order_relaxed)) {
triggered = prev;
}
}
}

if (triggered != kEmptyEventId || eof_) {
cv_.CancelWait();
} else {
cv_.CommitWait(w);
triggered = trigger_event_;
}
}
if (triggered) {
cv_.CancelWait();
} else {
cv_.CommitWait(w);
triggered = trigger_event_;

trigger_event_.store(kEmptyEventId, std::memory_order_relaxed);
waiting_.store(false, std::memory_order_relaxed);
std::string evt_name =
triggered == kEmptyEventId ? "NoEventNotifier" : GetEventName(triggered);
VLOG(10) << "Consume event id:" << triggered << ", name:" << evt_name;
// lazy deletion
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
if (deleted_events_.size() > 0) {
for (auto evt : deleted_events_) {
events_.erase(evt);
}
deleted_events_.clear();
}
}
trigger_event_.store(nullptr, std::memory_order_relaxed);
waiting_.store(false);
auto trigger_event = *triggered;
delete triggered;
return trigger_event;
return evt_name;
}

int EventsWaiter::Clear() {
Expand All @@ -106,32 +166,33 @@ int EventsWaiter::Clear() {
std::memory_order_relaxed)) {
return -1;
}
trigger_event_.store(nullptr, std::memory_order_relaxed);
trigger_event_.store(kEmptyEventId, std::memory_order_relaxed);
waiting_.store(false);
return 0;
}

void EventsWaiter::TriggerEvent(const EventId& id) {
VLOG(10) << "Try to trigger event id:" << id;
std::string* trigger_event = new std::string;
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
auto iter = events_.find(id);
if (iter == events_.end()) {
delete trigger_event;
return;
}
*trigger_event = iter->second.name;
EventId prev = kEmptyEventId;
if (!trigger_event_.compare_exchange_strong(
prev, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
VLOG(10) << "Event id:" << prev << " is pending";
return;
}
std::string* prev = nullptr;
if (!trigger_event_.compare_exchange_strong(prev, trigger_event,
VLOG(10) << "Triggered event id:" << id;
cv_.Notify(true);
}

void EventsWaiter::CancelEvent(const EventId& id) {
VLOG(10) << "Try to cancel event id:" << id;
EventId prev = id;
if (!trigger_event_.compare_exchange_strong(prev, kEmptyEventId,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
delete trigger_event;
VLOG(10) << "Event id:" << prev << " is pending";
return;
}
VLOG(10) << "Triggered event id:" << id << " name:" << *trigger_event;
cv_.Notify(true);
VLOG(10) << "Cancelled event id:" << id;
}

std::string EventsWaiter::GetEventName(const EventId& id) {
Expand Down
14 changes: 9 additions & 5 deletions paddle/fluid/framework/new_executor/workqueue/events_waiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <functional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include "paddle/fluid/framework/new_executor/workqueue/event_count.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"

Expand All @@ -37,13 +38,12 @@ class EventsWaiter {
// Make sure EventsWaiter has a longer lifetime than EventNotifier.
class EventNotifier {
public:
void NotifyEvent() { waiter_.TriggerEvent(id_); }
~EventNotifier() { waiter_.UnregisterEvent(id_); }

void UnregisterEvent() { waiter_.UnregisterEvent(id_); }
void NotifyEvent() { waiter_.TriggerEvent(id_); }

EventId GetEventId() { return id_; }
void CancelEvent() { waiter_.CancelEvent(id_); }

// return "Unregistered" if the corresponding event was unregistered.
std::string GetEventName() { return waiter_.GetEventName(id_); }

private:
Expand Down Expand Up @@ -97,12 +97,16 @@ class EventsWaiter {

void TriggerEvent(const EventId& id);

void CancelEvent(const EventId& id);

std::string GetEventName(const EventId& id);

std::unordered_map<EventId, EventInfo> events_;
std::unordered_set<EventId> deleted_events_;
paddle::memory::SpinLock events_lock_;
std::atomic<std::string*> trigger_event_;
std::atomic<EventId> trigger_event_;
std::atomic<uint64_t> counter_;
std::atomic<bool> eof_;
std::atomic<bool> waiting_;
EventCount cv_;
};
Expand Down
21 changes: 4 additions & 17 deletions paddle/fluid/framework/new_executor/workqueue/workqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ class WorkQueueImpl : public WorkQueue {
public:
explicit WorkQueueImpl(const WorkQueueOptions& options) : WorkQueue(options) {
if (options_.track_task && options.events_waiter != nullptr) {
empty_notifier_ = options.events_waiter->RegisterEvent(kQueueEmptyEvent);
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
TaskTracker* tracker = reinterpret_cast<TaskTracker*>(storage);
empty_notifier_ = options.events_waiter->RegisterEvent(
kQueueEmptyEvent,
[tracker]() { return tracker->PendingTaskNum() == 0; });
tracker_ = new (storage) TaskTracker(*empty_notifier_.get());
}
if (options_.detached == false && options.events_waiter != nullptr) {
Expand All @@ -47,17 +44,13 @@ class WorkQueueImpl : public WorkQueue {
}

virtual ~WorkQueueImpl() {
if (empty_notifier_) {
empty_notifier_->UnregisterEvent();
}
delete queue_;
if (tracker_ != nullptr) {
tracker_->~TaskTracker();
AlignedFree(tracker_);
}
if (destruct_notifier_) {
destruct_notifier_->NotifyEvent();
destruct_notifier_->UnregisterEvent();
}
}

Expand Down Expand Up @@ -124,14 +117,12 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
const auto& options = queues_options_[idx];
if (options.track_task && tracker_ == nullptr &&
options.events_waiter != nullptr) {
empty_notifier_ = options.events_waiter->RegisterEvent(kQueueEmptyEvent);
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
TaskTracker* tracker = reinterpret_cast<TaskTracker*>(storage);
empty_notifier_ = options.events_waiter->RegisterEvent(
kQueueEmptyEvent,
[tracker]() { return tracker->PendingTaskNum() == 0; });
tracker_ = new (storage) TaskTracker(*empty_notifier_.get());
}
if (options.detached == false && options.events_waiter != nullptr) {
if (options.detached == false && options.events_waiter != nullptr &&
!destruct_notifier_) {
destruct_notifier_ =
options.events_waiter->RegisterEvent(kQueueDestructEvent);
}
Expand All @@ -141,9 +132,6 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
}

WorkQueueGroupImpl::~WorkQueueGroupImpl() {
if (empty_notifier_) {
empty_notifier_->UnregisterEvent();
}
for (auto queue : queues_) {
queue->~NonblockingThreadPool();
}
Expand All @@ -154,7 +142,6 @@ WorkQueueGroupImpl::~WorkQueueGroupImpl() {
free(queues_storage_);
if (destruct_notifier_) {
destruct_notifier_->NotifyEvent();
destruct_notifier_->UnregisterEvent();
}
}

Expand Down
Loading