Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi carriers #38709

Merged
merged 5 commits into from
Jan 7, 2022
Merged

Conversation

LiYuRio
Copy link
Contributor

@LiYuRio LiYuRio commented Jan 5, 2022

PR types

Others

PR changes

Others

Describe

  • Message Bus的初始化移动到FleetExecutor构造阶段,一个FleetExecutor只有一个Message Bus,不会重复创建。
  • 和brpc远端通信相关的全部移入message bus,增加DispatchMsgToCarrier的接口将远端信息交给carrier。

截屏2022-01-06 下午2 05 31

@paddle-bot-old
Copy link

paddle-bot-old bot commented Jan 5, 2022

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

// Message bus will be created and inited only once
GlobalVal<MessageBus>::Create();
InitMessageBus();
GlobalVal<MessageBus>::Get()->Barrier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在carrier之前init message bus可能存在问题。message bus初始化之后就是可以收消息了,如果收到消息别的rank发来的消息的时候,本rank还没有创建carrier,这个消息怎么办?还是在init至少一个carrier之后再init message bus吧。

dst_interceptor->EnqueueRemoteInterceptorMessage(interceptor_message);
}
PADDLE_ENFORCE_EQ(
interceptor_message.ctrl_message(), false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要不之后把Barrier单独拎一个service出来

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得可以,正在想这个问题,多carrier之后,不仅message bus需要barrier,carrier之间也需要barrier

InitMessageBus();

// Wait for all message bus connected.
msg_bus_->Barrier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个barrier不能少吧,需要下游的Interceptor建立起来才能开始发消息。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修改

static T value;
return &value;
static std::unique_ptr<T>* GetPPtr() {
static std::unique_ptr<T> ptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥换成了unique_ptr,而且返回unique_ptr的指针...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

换成指针是因为不想MessageBus乱复制,把它的拷贝复制那些禁掉了。换成unique_ptr是不想手动管理内存,要不还得有个Release接口。

msg_bus->Init(0, {{0, ip0}, {1, ip1}}, ip0);
msg_bus->Barrier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个Barrier得在Interceptor建立好后加吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

啊,单测忘看了

GlobalVal<std::string>::Set(carrier_id);
Carrier* carrier = GlobalMap<std::string, Carrier>::Get(carrier_id);
// Set current running carrier
if (*GlobalVal<std::string>::Get() != carrier_id) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这么写好像有个假设条件,所有Carrier都会同时切换/不切换,要不就会hang在那里。

Copy link
Contributor

@wangxicoding wangxicoding left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangxicoding wangxicoding merged commit 769e5bc into PaddlePaddle:develop Jan 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants