-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor apply transformer #36899
Refactor apply transformer #36899
Changes from 4 commits
d47c1de
ce034db
2bcd0ce
2ad536f
b4fa2b1
43d1b4a
8bbc9d2
52a052e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,6 +216,129 @@ void apply_device_guard(const OperatorBase* op_base, | |
} | ||
} | ||
|
||
std::vector<OpFuncNode> apply_data_transformer(VariableValueMap& ins_map_temp, | ||
OpKernelType expected_kernel_key, | ||
VariableScope* var_scope, | ||
OpFuncNode& op_func_node, | ||
const platform::Place& place) { | ||
auto& op_base = op_func_node.operator_base_; | ||
auto& op = op_base; | ||
auto inputs_names = op->Inputs(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里 op_base 和 op是同一个指针,为什么需要两个变量?另外,对于指针类型,这里要PADDLE_ENFORCE_NOT_NULL判断下 |
||
auto& all_op_kernels = OperatorWithKernel::AllOpKernels(); | ||
auto& ins_name2id = op_func_node.input_index; | ||
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); | ||
|
||
std::unordered_set<int> | ||
no_data_transform_index; // record the no need transform variable index. | ||
std::vector<OpFuncNode> copy_func_nodes; // return all the copy opfuncnode. | ||
|
||
for (auto& var_name_item : ins_map_temp) { /*{{{*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 移除 /{{{/ |
||
for (size_t i = 0; i < var_name_item.second.size(); ++i) { | ||
auto var = var_name_item.second[i]; | ||
auto& var_name = inputs_names[var_name_item.first].at(i); | ||
auto tensor_in = GetLoDTensorOrSelectedRowsValueFromVar(*var); | ||
if (!tensor_in->IsInitialized()) { | ||
continue; | ||
} | ||
auto kernel_type_for_var = // the true kernel type for op_base | ||
static_cast<const framework::OperatorWithKernel*>(op_base) | ||
->GetKernelTypeForVar(var_name_item.first, *tensor_in, | ||
expected_kernel_key); | ||
if (platform::is_same_place(kernel_type_for_var.place_, | ||
expected_kernel_key.place_) || | ||
(is_cuda_pinned_place(kernel_type_for_var.place_) && | ||
is_cpu_place(expected_kernel_key.place_))) { | ||
// record no need data transformer input var_id | ||
VLOG(3) << op->Type() << " found no data_transform var: " << var_name | ||
<< " with id: " << var_name; | ||
no_data_transform_index.emplace(var_scope->VarId(var_name)); | ||
} else { | ||
if (op_base->Type() == "fetch_v2") { | ||
op_base->SetAttr("deepcopy", false); | ||
} | ||
std::string new_var_name = | ||
var_name + "_copy_" + std::to_string(var_scope->VarSize() + 1); | ||
var_scope->AddVar(new_var_name, nullptr); | ||
|
||
VariableNameMap copy_in_map; | ||
copy_in_map["X"] = {var_name}; | ||
VariableNameMap copy_out_map; | ||
copy_out_map["Out"] = {new_var_name}; | ||
AttributeMap attr_map; | ||
attr_map["dst_place_type"] = | ||
is_cpu_place(expected_kernel_key.place_) | ||
? 0 | ||
: is_gpu_place(expected_kernel_key.place_) ? 1 : -1; | ||
|
||
std::map<std::string, std::vector<int>> copy_ins_name2id; | ||
copy_ins_name2id["X"] = ins_name2id.at(var_name_item.first); | ||
std::map<std::string, std::vector<int>> copy_out_name2id; | ||
copy_out_name2id["Out"] = {var_scope->VarId(new_var_name)}; | ||
|
||
op_func_node.input_index[var_name_item.first][i] = | ||
var_scope->VarId(new_var_name); | ||
|
||
VariableValueMap copy_ins_value_map; | ||
copy_ins_value_map["X"] = {var}; | ||
VariableValueMap copy_outs_value_map; | ||
copy_outs_value_map["Out"] = {var_scope->Var(new_var_name)}; | ||
|
||
// memcpy_d2h, memcpy_h2d | ||
auto memcpy_op_type = get_memcpy_type(kernel_type_for_var.place_, | ||
expected_kernel_key.place_); | ||
VLOG(3) << string::Sprintf("Insert %s with %s(%s) -> %s(%s).", | ||
memcpy_op_type, var_name, | ||
kernel_type_for_var.place_, new_var_name, | ||
expected_kernel_key.place_); | ||
auto& copy_info = OpInfoMap::Instance().Get(memcpy_op_type); | ||
auto copy_op = copy_info.Creator()(memcpy_op_type, copy_in_map, | ||
copy_out_map, attr_map); | ||
OpFuncNode copy_op_func_node; | ||
copy_op_func_node.input_index = copy_ins_name2id; | ||
copy_op_func_node.output_index = copy_out_name2id; | ||
|
||
RuntimeContext copy_runtime_context({}, {}); | ||
copy_runtime_context.inputs.swap(copy_ins_value_map); | ||
copy_runtime_context.outputs.swap(copy_outs_value_map); | ||
InterpretercoreInferShapeContext copy_infer_shape_ctx( | ||
*copy_op, copy_runtime_context); | ||
static_cast<const framework::OperatorWithKernel*>(copy_op)->InferShape( | ||
©_infer_shape_ctx); | ||
|
||
auto kernels_iter = all_op_kernels.find(memcpy_op_type); | ||
PADDLE_ENFORCE_NE(kernels_iter, all_op_kernels.end(), | ||
platform::errors::Unavailable( | ||
"There are no kernels which are registered in " | ||
"the memcpy operator.")); | ||
|
||
OpKernelMap& kernels = kernels_iter->second; | ||
auto* dev_ctx = pool.Get(place); | ||
Scope scope; | ||
auto copy_exec_ctx = | ||
ExecutionContext(*copy_op, scope, *dev_ctx, copy_runtime_context); | ||
auto expected_kernel_key = | ||
dynamic_cast<const framework::OperatorWithKernel*>(copy_op) | ||
->GetExpectedKernelType(copy_exec_ctx); | ||
auto kernel_iter = kernels.find(expected_kernel_key); | ||
copy_op_func_node.kernel_func_ = | ||
OpKernelComputeFunc(kernel_iter->second); | ||
copy_op_func_node.kernel_func_(copy_exec_ctx); | ||
VLOG(3) << "Run " << memcpy_op_type << " done."; | ||
// NOTE(Aurelius84): memcpy_op is expensive operation, so we tag them | ||
// as kQueueSync and execute them in thread pool. | ||
copy_op_func_node.type_ = OpFuncType::kQueueSync; | ||
copy_op_func_node.dev_ctx_ = dev_ctx; | ||
copy_op_func_node.operator_base_ = copy_op; | ||
copy_func_nodes.push_back(copy_op_func_node); | ||
|
||
var_name_item.second[i] = var_scope->Var(new_var_name); | ||
} | ||
} | ||
} /*}}}*/ | ||
op_func_node.no_data_transform_index = std::move(no_data_transform_index); | ||
return copy_func_nodes; | ||
} | ||
|
||
void build_op_func_list(const platform::Place& place, | ||
const framework::ProgramDesc& pdesc, | ||
std::vector<OpFuncNode>* vec_func_list, | ||
|
@@ -272,123 +395,28 @@ void build_op_func_list(const platform::Place& place, | |
Scope scope; | ||
auto expected_kernel_key = | ||
dynamic_cast<const framework::OperatorWithKernel*>(op_base) | ||
->GetExpectedKernelType( | ||
->GetExpectedKernelType( // for all variables's type and context's | ||
// place | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里的注释不是很明确,完善或者移除掉 |
||
ExecutionContext(*op_base, scope, *dev_ctx, runtime_context)); | ||
|
||
// consider device_guard() | ||
apply_device_guard(op_base, place, &expected_kernel_key); | ||
apply_device_guard( | ||
op_base, place, | ||
&expected_kernel_key); // change device by the device_guard() | ||
VLOG(3) << "expected_kernel_key : " << expected_kernel_key; | ||
|
||
// step 3. Insert memcpy_op if needed | ||
// step 3. apply data transforms and insert memory ops | ||
VariableValueMap& ins_map_temp = runtime_context.inputs; | ||
std::unordered_set<int> no_data_transform_index; | ||
|
||
for (auto& var_name_item : ins_map_temp) { | ||
for (size_t i = 0; i < var_name_item.second.size(); ++i) { | ||
auto var = var_name_item.second[i]; | ||
auto& var_name = inputs_names[var_name_item.first].at(i); | ||
auto tensor_in = GetLoDTensorOrSelectedRowsValueFromVar(*var); | ||
if (!tensor_in->IsInitialized()) { | ||
continue; | ||
} | ||
auto kernel_type_for_var = | ||
static_cast<const framework::OperatorWithKernel*>(op_base) | ||
->GetKernelTypeForVar(var_name_item.first, *tensor_in, | ||
expected_kernel_key); | ||
if (platform::is_same_place(kernel_type_for_var.place_, | ||
expected_kernel_key.place_) || | ||
(is_cuda_pinned_place(kernel_type_for_var.place_) && | ||
is_cpu_place(expected_kernel_key.place_))) { | ||
// record no need data transformer input var_id | ||
VLOG(3) << op->Type() << " found no data_transform var: " << var_name | ||
<< " with id: " << var_name; | ||
no_data_transform_index.emplace(var_scope->VarId(var_name)); | ||
} else { | ||
if (op_base->Type() == "fetch_v2") { | ||
op_base->SetAttr("deepcopy", false); | ||
} | ||
std::string new_var_name = | ||
var_name + "_copy_" + std::to_string(var_scope->VarSize() + 1); | ||
var_scope->AddVar(new_var_name, nullptr); | ||
|
||
VariableNameMap copy_in_map; | ||
copy_in_map["X"] = {var_name}; | ||
VariableNameMap copy_out_map; | ||
copy_out_map["Out"] = {new_var_name}; | ||
AttributeMap attr_map; | ||
attr_map["dst_place_type"] = | ||
is_cpu_place(expected_kernel_key.place_) | ||
? 0 | ||
: is_gpu_place(expected_kernel_key.place_) ? 1 : -1; | ||
|
||
std::map<std::string, std::vector<int>> copy_ins_name2id; | ||
copy_ins_name2id["X"] = ins_name2id.at(var_name_item.first); | ||
std::map<std::string, std::vector<int>> copy_out_name2id; | ||
copy_out_name2id["Out"] = {var_scope->VarId(new_var_name)}; | ||
|
||
op_func_node.input_index[var_name_item.first][i] = | ||
var_scope->VarId(new_var_name); | ||
|
||
VariableValueMap copy_ins_value_map; | ||
copy_ins_value_map["X"] = {var}; | ||
VariableValueMap copy_outs_value_map; | ||
copy_outs_value_map["Out"] = {var_scope->Var(new_var_name)}; | ||
|
||
// memcpy_d2h, memcpy_h2d | ||
auto memcpy_op_type = get_memcpy_type(kernel_type_for_var.place_, | ||
expected_kernel_key.place_); | ||
VLOG(3) << string::Sprintf("Insert %s with %s(%s) -> %s(%s).", | ||
memcpy_op_type, var_name, | ||
kernel_type_for_var.place_, new_var_name, | ||
expected_kernel_key.place_); | ||
auto& copy_info = OpInfoMap::Instance().Get(memcpy_op_type); | ||
auto copy_op = copy_info.Creator()(memcpy_op_type, copy_in_map, | ||
copy_out_map, attr_map); | ||
OpFuncNode copy_op_func_node; | ||
copy_op_func_node.input_index = copy_ins_name2id; | ||
copy_op_func_node.output_index = copy_out_name2id; | ||
|
||
RuntimeContext copy_runtime_context({}, {}); | ||
copy_runtime_context.inputs.swap(copy_ins_value_map); | ||
copy_runtime_context.outputs.swap(copy_outs_value_map); | ||
InterpretercoreInferShapeContext copy_infer_shape_ctx( | ||
*copy_op, copy_runtime_context); | ||
static_cast<const framework::OperatorWithKernel*>(copy_op) | ||
->InferShape(©_infer_shape_ctx); | ||
|
||
auto kernels_iter = all_op_kernels.find(memcpy_op_type); | ||
PADDLE_ENFORCE_NE(kernels_iter, all_op_kernels.end(), | ||
platform::errors::Unavailable( | ||
"There are no kernels which are registered in " | ||
"the memcpy operator.")); | ||
|
||
OpKernelMap& kernels = kernels_iter->second; | ||
auto* dev_ctx = pool.Get(place); | ||
Scope scope; | ||
auto copy_exec_ctx = | ||
ExecutionContext(*copy_op, scope, *dev_ctx, copy_runtime_context); | ||
auto expected_kernel_key = | ||
dynamic_cast<const framework::OperatorWithKernel*>(copy_op) | ||
->GetExpectedKernelType(copy_exec_ctx); | ||
auto kernel_iter = kernels.find(expected_kernel_key); | ||
copy_op_func_node.kernel_func_ = | ||
OpKernelComputeFunc(kernel_iter->second); | ||
copy_op_func_node.kernel_func_(copy_exec_ctx); | ||
VLOG(3) << "Run " << memcpy_op_type << " done."; | ||
// NOTE(Aurelius84): memcpy_op is expensive operation, so we tag them | ||
// as kQueueSync and execute them in thread pool. | ||
copy_op_func_node.type_ = OpFuncType::kQueueSync; | ||
copy_op_func_node.dev_ctx_ = dev_ctx; | ||
copy_op_func_node.operator_base_ = copy_op; | ||
vec_func_list->push_back(copy_op_func_node); | ||
|
||
var_name_item.second[i] = var_scope->Var(new_var_name); | ||
} | ||
} | ||
std::vector<OpFuncNode> copy_op_to_insert; | ||
// NOTE(xiongkun03): assign op_base here to reduce parameter number of | ||
// apply_data_transformer. | ||
op_func_node.operator_base_ = op_base; | ||
copy_op_to_insert = apply_data_transformer( | ||
ins_map_temp, expected_kernel_key, var_scope, op_func_node, place); | ||
for (auto& item : copy_op_to_insert) { | ||
vec_func_list->push_back(item); | ||
} | ||
op_func_node.no_data_transform_index = std::move(no_data_transform_index); | ||
// step 4. Run op kernel | ||
op_func_node.operator_base_ = op_base; | ||
VLOG(3) << op_base->Type() | ||
<< " : expected_kernel_key : " << expected_kernel_key; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apply_data_transformer -> apply_data_transform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
函数签名把const放前面,非const放后面
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ins_map_temp 函数里修改了么?没有修改的话,这里应该是 const 修饰下?expected_kernel_key 也确认下?