Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet Executor] Support multi carrier #38535

Merged
merged 2 commits into from
Dec 30, 2021

Conversation

LiYuRio
Copy link
Contributor

@LiYuRio LiYuRio commented Dec 28, 2021

PR types

Others

PR changes

Others

Describe

  • 支持一个FleetExecutor管理多个carrier

截屏2021-12-28 下午4 20 14

@paddle-bot-old
Copy link

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

@LiYuRio LiYuRio changed the title Support multi carrier [Fleet Executor] Support multi carrier Dec 28, 2021
namespace paddle {
namespace distributed {

template <typename KeyT, typename ValueT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这样的话,一个type到另一个type的组合含义是固定的?
如果以后需要interceptor id到thread id,carrier id到stream id的等其他int64到int64的映射,如何复用这个类呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有todo,下一个pr会改

Copy link
Contributor

@FeixLiu FeixLiu Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你是说用InterceptorId的structure� 来代替int 64那个todo?这样如果同时存在interceptor id到thread id,interceptor id到carrier id的映射怎么处理?还是说未来会更新这个global map类,把TODO放在这个类里?

Copy link
Contributor Author

@LiYuRio LiYuRio Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有这个场景,而且要是有的话也可以加ThreadID和CarrierID

@FeixLiu FeixLiu requested a review from wangxicoding December 30, 2021 01:07
@@ -140,7 +156,9 @@ bool Carrier::Send(const InterceptorMessage& msg) {
if (src_rank == dst_rank) {
VLOG(3) << "Send a message from interceptor " << src_id
<< " to interceptor " << dst_id << ", which are in the same ranks.";
return EnqueueInterceptorMessage(msg);
int64_t carrier_id = *GlobalMap<int64_t, int64_t>::Get(dst_id);
return GlobalMap<int64_t, Carrier>::Get(carrier_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不同carrier没有互通的必要吧

Copy link
Contributor Author

@LiYuRio LiYuRio Dec 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是因为如果src interceptor和dst interceptor对应不同的carrier,但这两个carrier在相同的rank下,src interceptor里调用的是自己carrier的send,这里不处理的话会出问题。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,不过我觉得不同carrier没有互通的必要,你这个场景肯定是src和dst有关联,那么应该属于一个carrier。不同carrier可以对应不同program,比如训练和预测program,跑一个epoch训练再预测一下,时间和关系上是相互独立的。

msg_bus_->IsInit(), true,
platform::errors::Unavailable("MessageBus has not been init yet."));
GetCarrier()->Start();
for (const auto& item : runtime_graph_->carrier_id_to_interceptor_ids()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同时跑多个carrier吗

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpu carrier与gpu carrier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是想同时启动起来的,不能运行可以等在那

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一个时刻应该只跑一个carrier吧,同时跑应该是通过graph的拓扑依赖,两个拓扑依赖的节点可以同时跑

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该跑一个carrier,然后不同Program对应不同的carrier,通过外面的接口选择具体是跑哪个carrier


private:
static std::unique_ptr<ValueT>* GetPPtr(KeyT id) {
static std::mutex mutex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前carrier使用场景都是先初始化固定好的,没有读写并发的情况下,可以不用加锁。当然也不排除之后carrier也是动态的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个global map除了用在多carrier,interceptor id 到 carrier id也有一个map,这里会有并发的情况,就统一了一下,不过现在没有动态增删interceptor也可以把这个map的建立放到初始化里。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

后面再增加一个没有锁的类吧,这样都加锁确实开销很大

if (request->ctrl_message()) {
carrier_id = 0;
} else {
carrier_id = *GlobalMap<int64_t, int64_t>::Get(request->dst_id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里获取了carrier,前面Carrier中不用重新获取了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个和前面carrier的那个场景不一样,这是不同rank的消息收发,前面是相同rank不同carrier的消息收发

// TODO(liyurui): Remove this hard code.
int64_t carrier_id;
if (request->ctrl_message()) {
carrier_id = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

barrier那个逻辑应该可以直接放到这来了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把message bus的instance传下来?把message bus也放global map里好像也行

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,不过这样也有点麻烦了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

barrier那个逻辑应该可以直接放到这来了

是的

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用global map的话是不是carrier也不用存message bus的instance了,直接存id就行。

// NOTE: need Init msg_bus after carrier SetMsgBus
carrier->Init(0, interceptor_id_to_rank, {0});
msg_bus->Init(0, {{0, ip0}, {1, ip1}}, ip0);
carrier->SetMsgBus(msg_bus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行重复了

carrier.SetInterceptor(5, InterceptorFactory::Create("Compute", 5, node_f));
carrier->SetInterceptor(0, InterceptorFactory::Create("Compute", 0, node_a));
carrier->SetInterceptor(1,
InterceptorFactory::Create("Amplifier", 1, node_b));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

换行不整齐了🥺

Copy link
Contributor

@wangxicoding wangxicoding left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangxicoding wangxicoding merged commit 3658405 into PaddlePaddle:develop Dec 30, 2021
@LiYuRio LiYuRio deleted the dev_multi_carriers branch December 31, 2021 09:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants