-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support multi carriers #38709
Support multi carriers #38709
Conversation
Thanks for your contribution! |
// Message bus will be created and inited only once | ||
GlobalVal<MessageBus>::Create(); | ||
InitMessageBus(); | ||
GlobalVal<MessageBus>::Get()->Barrier(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在carrier之前init message bus可能存在问题。message bus初始化之后就是可以收消息了,如果收到消息别的rank发来的消息的时候,本rank还没有创建carrier,这个消息怎么办?还是在init至少一个carrier之后再init message bus吧。
dst_interceptor->EnqueueRemoteInterceptorMessage(interceptor_message); | ||
} | ||
PADDLE_ENFORCE_EQ( | ||
interceptor_message.ctrl_message(), false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
要不之后把Barrier单独拎一个service出来
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我觉得可以,正在想这个问题,多carrier之后,不仅message bus需要barrier,carrier之间也需要barrier
InitMessageBus(); | ||
|
||
// Wait for all message bus connected. | ||
msg_bus_->Barrier(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个barrier不能少吧,需要下游的Interceptor建立起来才能开始发消息。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已修改
static T value; | ||
return &value; | ||
static std::unique_ptr<T>* GetPPtr() { | ||
static std::unique_ptr<T> ptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为啥换成了unique_ptr,而且返回unique_ptr的指针...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
换成指针是因为不想MessageBus乱复制,把它的拷贝复制那些禁掉了。换成unique_ptr是不想手动管理内存,要不还得有个Release接口。
msg_bus->Init(0, {{0, ip0}, {1, ip1}}, ip0); | ||
msg_bus->Barrier(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个Barrier得在Interceptor建立好后加吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
啊,单测忘看了
GlobalVal<std::string>::Set(carrier_id); | ||
Carrier* carrier = GlobalMap<std::string, Carrier>::Get(carrier_id); | ||
// Set current running carrier | ||
if (*GlobalVal<std::string>::Get() != carrier_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这么写好像有个假设条件,所有Carrier都会同时切换/不切换,要不就会hang在那里。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR types
Others
PR changes
Others
Describe