-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch barrier in send/recv op #7847
Conversation
@@ -45,6 +45,8 @@ class AsyncGRPCServer final : public sendrecv::SendRecvService::Service { | |||
void WaitCond(int cond); | |||
void SetCond(int cond); | |||
void WaitClientGet(int count); | |||
bool CondEqualTo(int cond); | |||
void SubCond(int arg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SubCond is not used in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this PR is also work in progress and this interface will be used in rpc_server while recv a BatchBarrier
RPC call, I will implement this shortly.
@@ -97,6 +96,21 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, | |||
return true; | |||
} | |||
|
|||
bool RPCClient::AsyncBatchBarrier(const std::string& ep, int64_t time_out) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchBarrier
是个名词,看这个实现应该是AsyncSendBatchBarrier
,或者这个实现直接改成一个同步的调用会比较方便。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, 同时支持同步和异步两种请求的Server可能会比较复杂,现在的方案可以复用AsyncSendVariable
接口来发送barrier signal,代码会简洁很多,如果后续有强需求支持同步的请求,再实现同步的接口?
@@ -132,6 +132,8 @@ void AsyncGRPCServer::RunSyncUpdate() { | |||
|
|||
cq_send_ = builder.AddCompletionQueue(); | |||
cq_get_ = builder.AddCompletionQueue(); | |||
cq_batch_barrier_ = builder.AddCompletionQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cq_batch_barrier_
seems never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, sorry I forgot to delete the older code.
@@ -69,6 +70,7 @@ class AsyncGRPCServer final : public sendrecv::SendRecvService::Service { | |||
volatile bool is_shut_down_ = false; | |||
std::unique_ptr<grpc::ServerCompletionQueue> cq_send_; | |||
std::unique_ptr<grpc::ServerCompletionQueue> cq_get_; | |||
std::unique_ptr<grpc::ServerCompletionQueue> cq_batch_barrier_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cq_batch_barrier_
seems never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/operators/recv_op.cc
Outdated
batch_barrier++; | ||
continue; | ||
} | ||
barrier_size++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
barrier_size
is used only for printing log, can remove or rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/operators/recv_op.cc
Outdated
for (size_t i = 0; i < barrier_size; ++i) { | ||
size_t barrier_size = 0; | ||
int batch_barrier = 0; | ||
while (batch_barrier != fan_in || !rpc_service_->IsRecvQueueEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why !rpc_service_->IsRecvQueueEmpty()
is needed. rpc_service_->Get()
is a blocking call which will wait until a new message arrives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, it's not used :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I deleted !rpc_service_->IsRecvQueueEmpty()
, because send op would send barrier signal by least, if RecvOp received barrier signal, it should be the least message from one trainer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one minor comment can be updated later.
paddle/operators/send_op.cc
Outdated
client_.AsyncSendVariable(epmap[i], ctx, scope, ins[i]); | ||
} | ||
PADDLE_ENFORCE(client_.Wait()); | ||
|
||
std::set<std::string> epset(epmap.begin(), epmap.end()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use endpoints
attribute is the same thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Fixed #7764