Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] Cherry pick pr of new-exec #42009

Merged
merged 3 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 61 additions & 33 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,19 @@ paddle::framework::FetchList InterpreterCore::Run(
Prepare(feed_names, feed_tensors, is_build);

if (is_build) {
// add listener before run and is_build=true
global_scope_->ResetListener();

ExecuteInstructionList(vec_instruction_);
}

if (create_local_scope_) {
ClearLoDTensorArrayInLocalScope();
}

// clear the listener after run
global_scope_->ClearListener();

// return Fetch Tensors
auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
return std::move(*fetch_var->GetMutable<framework::FetchList>());
Expand Down Expand Up @@ -162,13 +168,19 @@ paddle::framework::FetchList InterpreterCore::Run(
Convert(&op_func_nodes);

} else {
// add listener before run and is_build=true
global_scope_->ResetListener();

ExecuteInstructionList(vec_instruction_);
}

if (create_local_scope_) {
ClearLoDTensorArrayInLocalScope();
}

// clear the listener after run
global_scope_->ClearListener();

// return Fetch Tensors
auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName);
return std::move(*fetch_var->GetMutable<framework::FetchList>());
Expand All @@ -192,7 +204,8 @@ void InterpreterCore::BuildOperatorDependences() {
// Schedule
auto op_nums = vec_instruction_.size();
dependecy_count_.resize(op_nums);
auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_);
auto op2downstream = interpreter::build_op_downstream_map(
vec_instruction_, &op_happens_before_);
for (size_t op = 0; op < vec_instruction_.size(); ++op) {
auto op_list = op2downstream[op];
std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
Expand All @@ -213,57 +226,46 @@ void InterpreterCore::Convert(

auto op_nums = nodes.size();
vec_instruction_.reserve(op_nums);

for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
auto& op_func_node = nodes[op_idx];
auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);

vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
auto& instr = vec_instruction_.back();
}

BuildOperatorDependences();

// calculate last_live_ops_
for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
auto& instr = vec_instruction_[op_idx];
OpInOutInfo info;
std::vector<size_t> gc_check_input_list;
std::set<size_t> gc_check_inputs;

for (auto& item : op_func_node.input_index) {
for (auto& item : instr.Inputs()) {
for (auto id : item.second) {
if (id == kEmptyVarIndex) {
continue;
}
input_var2op_info_.at(id).push_back(op_idx);
// var can be gc-ed
if (!info.IsBuilt()) {
info.Build(op_func_node.operator_base_.get());
info.Build(instr.OpBase());
}
auto* var_desc = global_scope_->VarDesc(id);
if (var_desc) {
if (info.IsInArgBufferNeeded(var_desc->Name())) {
gc_check_input_list.push_back(id);
gc_check_inputs.insert(id);
}
} else {
gc_check_input_list.push_back(id);
gc_check_inputs.insert(id);
}
}
}
std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
auto last =
std::unique(gc_check_input_list.begin(), gc_check_input_list.end());
gc_check_input_list.erase(last, gc_check_input_list.end());

for (auto var_id : gc_check_input_list) {
for (auto var_id : gc_check_inputs) {
paddle::framework::Variable* var = global_scope_->Var(var_id);
if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>() ||
var->IsType<LoDTensorArray>()) {
vec_meta_info[var_id].var_ref_count_++;
// TODO(zhiqiu): not all var needs to be checked, var need to be checked
// only
// after the last_live_op. For example,
// b = op1(a)
// c = op2(a, b)
// in this case, a is the input of op1 and op2, we only need to check
// a after op2, because op2 always uses a after op1.
instr.AddGCCheckVar(var_id);
VLOG(4) << "clear " << global_scope_->GetNameById(var_id) << " after "
<< instr.OpBase()->Type();
last_live_ops_[var_id].insert(op_idx);
} else {
VLOG(4) << "not clear " << global_scope_->GetNameById(var_id)
<< " after " << instr.OpBase()->Type()
Expand All @@ -276,19 +278,45 @@ void InterpreterCore::Convert(
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
// checkout ouput
for (auto& item : vec_instruction_[i].Outputs()) {
for (auto id : item.second) {
if (input_var2op_info_.at(id).size() == 0) {
// output var not be used by any kernel
vec_instruction_[i].AddGCCheckVar(id);
VLOG(4) << "clear " << global_scope_->GetNameById(id) << " after "
<< vec_instruction_[i].OpBase()->Type();
vec_meta_info[id].var_ref_count_++;
for (auto var_id : item.second) {
if (input_var2op_info_.at(var_id).size() == 0) {
last_live_ops_[var_id].insert(i);
}
}
}
}

BuildOperatorDependences();
// shrink, find the downstream op that has no other op in the
// downstream list happens before it
// For example,
// b = op1(a)
// c = op2(a, b)
// in this case, a is the input of op1 and op2, we only need to check
// a after op2, because op2 always uses a after op1.
for (size_t i = 0; i < last_live_ops_.size(); ++i) {
std::set<size_t> minumum_last_live_ops;
for (size_t item : last_live_ops_[i]) {
bool not_before_any = true;
// find the op that is not executed before any
for (size_t other_item : last_live_ops_[i]) {
if (op_happens_before_[item][other_item]) {
VLOG(8) << "happens_before: " << item << "->" << other_item
<< ", so skip " << item;
not_before_any = false;
break;
}
}
if (not_before_any) {
VLOG(8) << "last live op of var " << i << " "
<< global_scope_->GetNameById(i) << " : " << item << " "
<< vec_instruction_[item].OpBase()->Type();
minumum_last_live_ops.insert(item);
vec_instruction_[item].AddGCCheckVar(i);
}
}
last_live_ops_[i] = minumum_last_live_ops;
vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size();
}

for (size_t i = 0; i < vec_instruction_.size(); ++i) {
BuildAndCacheInstructionCtx(&vec_instruction_[i]);
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ class InterpreterCore {

std::vector<Instruction> vec_instruction_; // deconstruct before OpFuncNode

// op_happens_before_[i][j] == true means op[i] happens before op[j]
std::vector<std::vector<bool>> op_happens_before_;
// last_live_ops_[i] contains the id of operatos that last access var[i]
std::map<size_t, std::set<size_t>> last_live_ops_;

std::vector<size_t> dependecy_count_;
std::atomic<size_t> unfinished_op_numer_{0};
std::vector<std::vector<size_t>> input_var2op_info_;
Expand Down
Loading