Skip to content

Commit

Permalink
Fix runtime graph on gpt, add debug message (#37361)
Browse files Browse the repository at this point in the history
  • Loading branch information
LiYuRio authored Nov 19, 2021
1 parent edc3496 commit af83e79
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 27 deletions.
1 change: 1 addition & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ FleetExecutor::~FleetExecutor() {

void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) {
runtime_graph_ = std::make_unique<RuntimeGraph>(program_desc, exe_desc_);
VLOG(5) << runtime_graph_->DebugString();
InitCarrier();
InitMessageBus();
}
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once
#include <memory>
#include <string>

#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/platform/macros.h"
Expand Down
44 changes: 27 additions & 17 deletions paddle/fluid/distributed/fleet_executor/runtime_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,24 @@ using OpRole = paddle::framework::OpRole;
using OpRegistry = paddle::framework::OpRegistry;
using ProgramDesc = paddle::framework::ProgramDesc;

bool IsForward(int64_t op_role) {
return (op_role == static_cast<int64_t>(OpRole::kForward)) ||
(op_role == (static_cast<int64_t>(OpRole::kForward) |
static_cast<int64_t>(OpRole::kLoss)));
bool IsForward(int32_t op_role) {
return (op_role == static_cast<int32_t>(OpRole::kForward)) ||
(op_role == (static_cast<int32_t>(OpRole::kForward) |
static_cast<int32_t>(OpRole::kLoss)));
}

bool IsLRSched(int64_t op_role) {
return op_role == static_cast<int64_t>(OpRole::kLRSched);
bool IsLRSched(int32_t op_role) {
return op_role == static_cast<int32_t>(OpRole::kLRSched);
}

bool IsBackward(int64_t op_role) {
return (op_role == static_cast<int64_t>(OpRole::kBackward)) ||
(op_role == (static_cast<int64_t>(OpRole::kBackward) |
static_cast<int64_t>(OpRole::kLoss)));
bool IsBackward(int32_t op_role) {
return (op_role == static_cast<int32_t>(OpRole::kBackward)) ||
(op_role == (static_cast<int32_t>(OpRole::kBackward) |
static_cast<int32_t>(OpRole::kLoss)));
}

bool IsOptimize(int64_t op_role) {
return op_role == static_cast<int64_t>(OpRole::kOptimize);
bool IsOptimize(int32_t op_role) {
return op_role == static_cast<int32_t>(OpRole::kOptimize);
}

struct DistCoord {
Expand Down Expand Up @@ -112,9 +112,9 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
for (const auto& op_desc : program.Block(0).AllOps()) {
ops_.emplace_back(OpRegistry::CreateOp(*op_desc));
}
std::unordered_map<int64_t, std::vector<OperatorBase*>> role_to_ops;
std::unordered_map<int32_t, std::vector<OperatorBase*>> role_to_ops;
for (const auto& op : ops_) {
int64_t op_role = op->Attr<int64_t>("op_role");
int32_t op_role = op->Attr<int32_t>("op_role");
OpRole new_op_role;
if (IsLRSched(op_role)) {
new_op_role = OpRole::kLRSched;
Expand All @@ -129,7 +129,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
"The op %s is None of LRSched, Forward, Backward or Optimize.",
op->Type()));
}
int64_t new_op_role_id = static_cast<int64_t>(new_op_role);
int32_t new_op_role_id = static_cast<int32_t>(new_op_role);
if (role_to_ops.find(new_op_role_id) == role_to_ops.end()) {
role_to_ops.insert({new_op_role_id, {}});
}
Expand All @@ -147,7 +147,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
int64_t task_id = cur_rank * functionality_order.size();
for (std::size_t i = 0; i < functionality_order.size(); ++i) {
OpRole role = functionality_order[i];
int64_t role_id = static_cast<int64_t>(role);
int32_t role_id = static_cast<int64_t>(role);
int64_t max_run_times = num_micro_batches;
int64_t max_slot_nums = start_up_steps;
if (IsLRSched(role_id) || IsOptimize(role_id)) {
Expand Down Expand Up @@ -225,12 +225,22 @@ void RuntimeGraph::FakeRuntimeInfo() {
int64_t nrank = exe_desc_.cluster_info().size();
int32_t num_of_functionality = functionality_order.size();
for (int64_t i = 0; i < nrank; ++i) {
for (int64_t j = 0; j < num_of_functionality; ++j) {
for (int32_t j = 0; j < num_of_functionality; ++j) {
int64_t intercepter_id = i * num_of_functionality + j;
intercepter_id_to_rank_.insert({intercepter_id, i});
}
}
}

std::string RuntimeGraph::DebugString() const {
std::ostringstream os;
os << "\nRuntime Graph Debug: \n";
for (const auto& task : task_nodes_) {
os << task->DebugString();
os << "\n";
}
return os.str();
}

} // namespace distributed
} // namespace paddle
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/fleet_executor/runtime_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
Expand Down Expand Up @@ -43,6 +44,7 @@ class RuntimeGraph final {
const std::unordered_map<int64_t, int64_t>& intercepter_id_to_rank() const {
return intercepter_id_to_rank_;
}
std::string DebugString() const;

private:
DISABLE_COPY_AND_ASSIGN(RuntimeGraph);
Expand Down
18 changes: 14 additions & 4 deletions paddle/fluid/distributed/fleet_executor/task_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace {
using OperatorBase = TaskNode::OperatorBase;
}

TaskNode::TaskNode(int64_t role, const std::vector<OperatorBase*>& ops,
TaskNode::TaskNode(int32_t role, const std::vector<OperatorBase*>& ops,
int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums)
: ops_(ops),
Expand All @@ -31,15 +31,15 @@ TaskNode::TaskNode(int64_t role, const std::vector<OperatorBase*>& ops,
max_run_times_(max_run_times),
max_slot_nums_(max_slot_nums) {}

TaskNode::TaskNode(int64_t role, int64_t rank, int64_t task_id,
TaskNode::TaskNode(int32_t role, int64_t rank, int64_t task_id,
int64_t max_run_times, int64_t max_slot_nums)
: role_(role),
rank_(rank),
task_id_(task_id),
max_run_times_(max_run_times),
max_slot_nums_(max_slot_nums) {}

std::unique_ptr<TaskNode> TaskNode::CreateEmptyTaskNode(int64_t role,
std::unique_ptr<TaskNode> TaskNode::CreateEmptyTaskNode(int32_t role,
int64_t rank,
int64_t task_id,
int64_t max_run_times,
Expand All @@ -49,7 +49,7 @@ std::unique_ptr<TaskNode> TaskNode::CreateEmptyTaskNode(int64_t role,
}

std::unique_ptr<TaskNode> TaskNode::CreateTaskNode(
int64_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int32_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int64_t task_id, int64_t max_run_times, int64_t max_slot_nums) {
return std::make_unique<TaskNode>(role, ops, rank, task_id, max_run_times,
max_slot_nums);
Expand All @@ -60,5 +60,15 @@ void TaskNode::AddUpstreamTask(int64_t task_id) { upstream_.insert(task_id); }
void TaskNode::AddDownstreamTask(int64_t task_id) {
downstream_.insert(task_id);
}

std::string TaskNode::DebugString() const {
std::ostringstream os;
os << "role: " << role_ << ", task_id: " << task_id_ << "\n";
for (std::size_t i = 0; i < ops_.size(); ++i) {
os << ops_[i]->Type() << " ";
}
os << "\n";
return os.str();
}
} // namespace distributed
} // namespace paddle
13 changes: 7 additions & 6 deletions paddle/fluid/distributed/fleet_executor/task_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,28 @@ namespace distributed {
class TaskNode final {
public:
using OperatorBase = paddle::framework::OperatorBase;
TaskNode(int64_t role, int64_t rank, int64_t task_id, int64_t max_run_times,
TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums);
TaskNode(int64_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
TaskNode(int32_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int64_t task_id, int64_t max_run_times, int64_t max_slot_nums);
~TaskNode() = default;
int64_t rank() const { return rank_; }
int64_t task_id() const { return task_id_; }
int64_t role() const { return role_; }
int32_t role() const { return role_; }
int64_t max_run_times() const { return max_run_times_; }
int64_t max_slot_nums() const { return max_slot_nums_; }
const std::unordered_set<int64_t>& upstream() const { return upstream_; }
const std::unordered_set<int64_t>& downstream() const { return downstream_; }
void AddUpstreamTask(int64_t task_id);
void AddDownstreamTask(int64_t task_id);
static std::unique_ptr<TaskNode> CreateEmptyTaskNode(int64_t role,
std::string DebugString() const;
static std::unique_ptr<TaskNode> CreateEmptyTaskNode(int32_t role,
int64_t rank,
int64_t task_id,
int64_t max_run_times,
int64_t max_slot_nums);
static std::unique_ptr<TaskNode> CreateTaskNode(
int64_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int32_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int64_t task_id, int64_t max_run_times, int64_t max_slot_nums);

private:
Expand All @@ -57,7 +58,7 @@ class TaskNode final {
std::vector<OperatorBase*> ops_;
std::unordered_set<int64_t> upstream_;
std::unordered_set<int64_t> downstream_;
int64_t role_;
int32_t role_;
int64_t rank_;
int64_t task_id_;
int64_t max_run_times_;
Expand Down

0 comments on commit af83e79

Please sign in to comment.