From 33ab12e2f840aaef2ad3f6e9521af2f45b809513 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 20 Jun 2018 10:04:22 +0800 Subject: [PATCH 01/11] fix pserver sub-blocks --- paddle/fluid/operators/listen_and_serv_op.cc | 10 +++++++++- .../fluid/transpiler/distribute_transpiler.py | 16 +++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 6086c31722c1a2..1ae940b7b23827 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -101,13 +101,16 @@ void ListenAndServOp::RunSyncLoop( framework::Scope *recv_scope, const std::vector &prefetch_block_id_list) const { size_t num_blocks = program->Size(); + auto skip_sub_blks = Attr>("skip_sub_blks"); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); std::vector optimize_block_id_list; for (int blkid = 1; blkid < num_blocks; ++blkid) { if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(), - blkid) == prefetch_block_id_list.end()) { + blkid) == prefetch_block_id_list.end() && + std::find(skip_sub_blks.begin(), skip_sub_blks.end(), blkid) == + skip_sub_blks.end()) { optimize_block_id_list.push_back(blkid); } } @@ -344,6 +347,11 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault({}); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); + AddAttr>("skip_sub_blks", + "do not parallel execute the specify sub blocks, " + "it's used for the op which has" + "condition blocks") + .SetDefault({}); } }; diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index d8d6a7e9418e1c..137e88a046481d 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -396,9 +396,9 @@ def __op_have_grad_input__(op): return varname return "" - def __clone_lr_op_sub_block__(op, program, new_block): + def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): if not op.has_attr('sub_block'): - return + return -1 origin_block_desc = op.attr('sub_block') origin_block = self.origin_program.block(origin_block_desc.id) @@ -406,6 +406,7 @@ def __clone_lr_op_sub_block__(op, program, new_block): # we put the new sub block to new block to follow the block # hierarchy of the original blocks new_sub_block = program.create_block(new_block.idx) + skip_sub_blks(new_sub_block.idx) # clone vars for var in origin_block.vars: @@ -415,20 +416,24 @@ def __clone_lr_op_sub_block__(op, program, new_block): for op in origin_block.ops: self._clone_lr_op(program, new_sub_block, op) # clone sub_block of op - __clone_lr_op_sub_block__(op, program, new_sub_block) + __clone_lr_op_sub_block__(op, program, new_sub_block, + skip_sub_blks) # reset the block of op op.set_attr('sub_block', new_sub_block) + return new_sub_block.idx # append lr decay ops to the child block if exists lr_ops = self._get_lr_ops() + skip_sub_blks = [] if len(lr_ops) > 0: lr_decay_block = pserver_program.create_block( pserver_program.num_blocks - 1) for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op - __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) + __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block, + skip_sub_blks) # append op to the current block grad_to_block_id = [] @@ -478,7 +483,8 @@ def __clone_lr_op_sub_block__(op, program, new_block): "endpoint": endpoint, "Fanin": self.trainer_num, "sync_mode": self.sync_mode, - "grad_to_block_id": grad_to_block_id + "grad_to_block_id": grad_to_block_id, + "skip_sub_blks": skip_sub_blks } if len(prefetch_var_name_to_block_id) > 0: attrs['prefetch_var_name_to_block_id'] \ From 8a264add52d479b32a97a000c432146f58de430d Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 20 Jun 2018 10:08:48 +0800 Subject: [PATCH 02/11] update --- python/paddle/fluid/transpiler/distribute_transpiler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 137e88a046481d..ecde55ac6865ce 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -406,7 +406,7 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): # we put the new sub block to new block to follow the block # hierarchy of the original blocks new_sub_block = program.create_block(new_block.idx) - skip_sub_blks(new_sub_block.idx) + skip_sub_blks.append(new_sub_block.idx) # clone vars for var in origin_block.vars: @@ -421,7 +421,6 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): # reset the block of op op.set_attr('sub_block', new_sub_block) - return new_sub_block.idx # append lr decay ops to the child block if exists lr_ops = self._get_lr_ops() From 2603d8158c4da2c42d09b73fb5ce514ac7bca2c2 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 20 Jun 2018 18:06:26 +0800 Subject: [PATCH 03/11] Merge branch 'develop' of github.com:PaddlePaddle/Paddle into fix_pserver_sub_blocks --- doc/fluid/api/transpiler.rst | 6 ------ python/paddle/fluid/transpiler/distribute_transpiler.py | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/doc/fluid/api/transpiler.rst b/doc/fluid/api/transpiler.rst index 943d39331d26c0..964ce22d4b0ce2 100644 --- a/doc/fluid/api/transpiler.rst +++ b/doc/fluid/api/transpiler.rst @@ -1,11 +1,6 @@ .. THIS FILE IS GENERATED BY `gen_doc.{py|sh}` !DO NOT EDIT THIS FILE MANUALLY! -================ -fluid.transpiler -================ - -.. _api_fluid_transpiler_DistributeTranspiler: DistributeTranspiler -------------------- @@ -47,4 +42,3 @@ RoundRobin .. autoclass:: paddle.fluid.transpiler.RoundRobin :members: :noindex: - diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ecde55ac6865ce..dc0ec6b8c6c144 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -398,7 +398,7 @@ def __op_have_grad_input__(op): def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): if not op.has_attr('sub_block'): - return -1 + return origin_block_desc = op.attr('sub_block') origin_block = self.origin_program.block(origin_block_desc.id) From f0dc5761725ca1275d54bd2f8c846bc52d4e95cd Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 21 Jun 2018 17:32:35 +0800 Subject: [PATCH 04/11] use optimize blocks attr to record optimize block id --- paddle/fluid/operators/listen_and_serv_op.cc | 29 +++++++------------ paddle/fluid/operators/listen_and_serv_op.h | 2 +- .../fluid/transpiler/distribute_transpiler.py | 9 ++++-- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 1ae940b7b23827..4f364667630e7b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -106,13 +106,8 @@ void ListenAndServOp::RunSyncLoop( "server program should have at least 2 blocks"); std::vector optimize_block_id_list; - for (int blkid = 1; blkid < num_blocks; ++blkid) { - if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(), - blkid) == prefetch_block_id_list.end() && - std::find(skip_sub_blks.begin(), skip_sub_blks.end(), blkid) == - skip_sub_blks.end()) { - optimize_block_id_list.push_back(blkid); - } + for (auto *block : optimize_blocks) { + optimize_block_id_list.push_back(block->ID()); } auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); // Insert placeholder for block0 which holds current op itself. @@ -137,9 +132,9 @@ void ListenAndServOp::RunSyncLoop( // and this will still work. // The optimize blocks which have the same parent ID would run parallel // TODO(Yancey1989): need to use ParallelExecutor for future - int32_t last_parent_blkid = program->Block(1).Parent(); + int32_t last_parent_blkid = optimize_blocks[0]->Parent(); std::vector parallel_blkids; - parallel_blkids.push_back(1); + parallel_blkids.push_back(optimize_blocks[0]->ID()); double ts = GetTimestamp(); for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { // skip the first optimize block because it is already in the @@ -262,8 +257,11 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_->RegisterRPC(distributed::kRequestPrefetch, request_prefetch_handler_.get()); - auto *optimize_block = Attr(kOptimizeBlock); - auto *program = optimize_block->Program(); + auto optimize_blocks = + Attr>(kOptimizeBlocks); + PADDLE_ENFORCE(optimize_blocks.size() > 1, + "optimize blocks should be 1 at least on the pserver side."); + auto *program = optimize_block[0]->Program(); framework::Executor executor(dev_place); // prepare for prefetch @@ -340,18 +338,13 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { "a map from grad name to it's optimize block id") .SetDefault({}); AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); - AddAttr(kOptimizeBlock, - "BlockID to run on server side."); + AddAttr(kOptimizeBlocks, + "Optimize blocks to run on server side."); AddAttr>(kPrefetchVarNameToBlockId, "prefetch blocks to run on server side.") .SetDefault({}); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); - AddAttr>("skip_sub_blks", - "do not parallel execute the specify sub blocks, " - "it's used for the op which has" - "condition blocks") - .SetDefault({}); } }; diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 9aa322ad602d7a..634c1b4f4b541b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -30,7 +30,7 @@ limitations under the License. */ namespace paddle { namespace operators { -constexpr char kOptimizeBlock[] = "OptimizeBlock"; +constexpr char kOptimizeBlocks[] = "optimize_blocks"; constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id"; void RunServer(std::shared_ptr service); diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index dc0ec6b8c6c144..cf59808b3d2e46 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -424,10 +424,12 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): # append lr decay ops to the child block if exists lr_ops = self._get_lr_ops() - skip_sub_blks = [] + # record optimize blocks and we can run them on pserver parallel + optimize_blocks = [] if len(lr_ops) > 0: lr_decay_block = pserver_program.create_block( pserver_program.num_blocks - 1) + optimize_blocks.append(lr_decay_block) for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op @@ -439,6 +441,7 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): pre_block_idx = pserver_program.num_blocks - 1 for idx, opt_op in enumerate(opt_op_on_pserver): per_opt_block = pserver_program.create_block(pre_block_idx) + optimize_blocks.append(per_opt_block) # append grad merging ops before clip and weight decay for _, op in enumerate(self.optimize_ops): # find the origin @GRAD var before clipping @@ -457,6 +460,7 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): if global_ops: opt_state_block = pserver_program.create_block( pserver_program.num_blocks - 1) + optimize_blocks.append(opt_state_block) for glb_op in global_ops: __append_optimize_op__(glb_op, opt_state_block, grad_to_block_id, None) @@ -478,12 +482,11 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): assert len(prefetch_var_name_to_block_id) == 0 attrs = { - "OptimizeBlock": pserver_program.block(1), + "optimize_blocks": optimize_blocks, "endpoint": endpoint, "Fanin": self.trainer_num, "sync_mode": self.sync_mode, "grad_to_block_id": grad_to_block_id, - "skip_sub_blks": skip_sub_blks } if len(prefetch_var_name_to_block_id) > 0: attrs['prefetch_var_name_to_block_id'] \ From 1e9a1dd3a11e5ba6bd7d4cf60fc007ad1404450a Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 21 Jun 2018 17:33:10 +0800 Subject: [PATCH 05/11] Merge branch 'develop' of github.com:PaddlePaddle/Paddle into fix_pserver_sub_blocks --- python/paddle/fluid/transpiler/distribute_transpiler.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index cf59808b3d2e46..391dddcf3e9c4f 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -880,8 +880,7 @@ def _create_table_optimize_block(self, pserver_index, pserver_program, table_opt_block.append_op( type="sum", inputs={"X": pserver_side_table_grad_list}, - outputs={"Out": [grad_var]}, - attrs={"use_mkldnn": False}) + outputs={"Out": [grad_var]}) else: # in async_mode, for table gradient, it also need to be splited to each parameter server origin_grad_name = grad_var.name @@ -1113,8 +1112,7 @@ def _append_pserver_grad_merge_ops(self, optimize_block, optimize_block.append_op( type="sum", inputs={"X": vars2merge}, - outputs={"Out": merged_var}, - attrs={"use_mkldnn": False}) + outputs={"Out": merged_var}) # TODO(panyx0718): What if it's SELECTED_ROWS. if not merged_var.type == core.VarDesc.VarType.SELECTED_ROWS: optimize_block.append_op( From b01ef70ced503a1c22ca8f07fd163fd57186ac3f Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 12:31:46 +0800 Subject: [PATCH 06/11] use optimize block list instead of first optimize block --- paddle/fluid/framework/framework.proto | 1 + paddle/fluid/framework/op_desc.cc | 13 +++++++++++++ paddle/fluid/framework/op_desc.h | 2 ++ paddle/fluid/framework/type_defs.h | 3 ++- paddle/fluid/operators/listen_and_serv_op.cc | 17 +++++++---------- paddle/fluid/pybind/protobuf.cc | 1 + python/paddle/fluid/framework.py | 6 ++++++ .../fluid/transpiler/distribute_transpiler.py | 9 +++------ 8 files changed, 35 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index 68fcc104d48b2b..8f73b3d478e6da 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -46,6 +46,7 @@ message OpDesc { repeated bool bools = 11; optional int32 block_idx = 12; optional int64 l = 13; + repeated int32 blocks_idx = 14; }; message Var { diff --git a/paddle/fluid/framework/op_desc.cc b/paddle/fluid/framework/op_desc.cc index f92769192c218e..a190199f1cb136 100644 --- a/paddle/fluid/framework/op_desc.cc +++ b/paddle/fluid/framework/op_desc.cc @@ -211,6 +211,12 @@ void OpDesc::SetBlockAttr(const std::string &name, BlockDesc *block) { need_update_ = true; } +void OpDesc::SetBlocksAttr(const std::string &name, + std::vector blocks) { + this->attrs_[name] = blocks; + need_update_ = true; +} + void OpDesc::SetAttrMap( const std::unordered_map &attr_map) { attrs_ = attr_map; @@ -305,6 +311,13 @@ struct SetAttrDescVisitor : public boost::static_visitor { void operator()(const std::vector &v) const { VectorToRepeated(v, attr_->mutable_bools()); } + void operator()(const std::vector &v) const { + std::vector blocks_idx; + for (auto blk : v) { + blocks_idx.push_back(blk->ID()); + } + VectorToRepeated(blocks_idx, attr_->mutable_blocks_idx()); + } void operator()(BlockDesc *desc) const { attr_->set_block_idx(desc->ID()); } void operator()(int64_t v) const { attr_->set_l(v); } void operator()(boost::blank) const { PADDLE_THROW("Unexpected branch"); } diff --git a/paddle/fluid/framework/op_desc.h b/paddle/fluid/framework/op_desc.h index a02d3e26912959..74dd8ec002005d 100644 --- a/paddle/fluid/framework/op_desc.h +++ b/paddle/fluid/framework/op_desc.h @@ -77,6 +77,8 @@ class OpDesc { void SetBlockAttr(const std::string &name, BlockDesc *block); + void SetBlocksAttr(const std::string &name, std::vector blocks); + Attribute GetAttr(const std::string &name) const; Attribute GetNullableAttr(const std::string &name) const; diff --git a/paddle/fluid/framework/type_defs.h b/paddle/fluid/framework/type_defs.h index 4879209ece9fdf..e099e40f121ff1 100644 --- a/paddle/fluid/framework/type_defs.h +++ b/paddle/fluid/framework/type_defs.h @@ -35,7 +35,8 @@ using VariableNameMap = std::map>; using Attribute = boost::variant, std::vector, std::vector, bool, - std::vector, BlockDesc*, int64_t>; + std::vector, BlockDesc*, int64_t, + std::vector>; using AttributeMap = std::unordered_map; diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 4f364667630e7b..a5bc5efaa623b3 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -101,14 +101,11 @@ void ListenAndServOp::RunSyncLoop( framework::Scope *recv_scope, const std::vector &prefetch_block_id_list) const { size_t num_blocks = program->Size(); - auto skip_sub_blks = Attr>("skip_sub_blks"); + auto optimize_blocks = + Attr>(kOptimizeBlocks); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - std::vector optimize_block_id_list; - for (auto *block : optimize_blocks) { - optimize_block_id_list.push_back(block->ID()); - } auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( @@ -136,10 +133,10 @@ void ListenAndServOp::RunSyncLoop( std::vector parallel_blkids; parallel_blkids.push_back(optimize_blocks[0]->ID()); double ts = GetTimestamp(); - for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { + for (size_t i = 1; i < optimize_blocks.size(); ++i) { // skip the first optimize block because it is already in the // parallel_blkids. - int blkid = optimize_block_id_list[i]; + int blkid = optimize_blocks[i]->ID(); if (program->Block(blkid).Parent() != last_parent_blkid) { ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, recv_scope); @@ -261,7 +258,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, Attr>(kOptimizeBlocks); PADDLE_ENFORCE(optimize_blocks.size() > 1, "optimize blocks should be 1 at least on the pserver side."); - auto *program = optimize_block[0]->Program(); + auto *program = optimize_blocks[0]->Program(); framework::Executor executor(dev_place); // prepare for prefetch @@ -338,8 +335,8 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { "a map from grad name to it's optimize block id") .SetDefault({}); AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); - AddAttr(kOptimizeBlocks, - "Optimize blocks to run on server side."); + AddAttr>( + kOptimizeBlocks, "Optimize blocks to run on server side."); AddAttr>(kPrefetchVarNameToBlockId, "prefetch blocks to run on server side.") .SetDefault({}); diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index bcf6d4dd308706..2d44e1f63cbd77 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -293,6 +293,7 @@ void BindOpDesc(pybind11::module *m) { .def("set_attr", &pd::OpDesc::SetAttr) .def("attr", &pd::OpDesc::GetAttr) .def("set_block_attr", &pd::OpDesc::SetBlockAttr) + .def("set_blocks_attr", &pd::OpDesc::SetBlocksAttr) .def("set_serialized_attr", [](pd::OpDesc &self, const std::string &name, const pybind11::bytes &seriralized) { diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index db21b1f3c03c40..1843072662541a 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -561,6 +561,10 @@ def find_name(var_list, name): if isinstance(self.attrs[attr_name], Block): self.desc.set_block_attr(attr_name, self.attrs[attr_name].desc) + elif isinstance(self.attrs[attr_name], list) and \ + all(isinstance(v, Block) for v in self.attrs[attr_name]): + self.desc.set_blocks_attr( + attr_name, [v.desc for v in self.attrs[attr_name]]) elif isinstance(self.attrs[attr_name], core.BlockDesc) or \ isinstance(self.attrs[attr_name], core.ProgramDesc): self.desc.set_serialized_attr( @@ -715,6 +719,8 @@ def set_attr(self, name, val): self.attrs[name] = val if isinstance(val, Block): self.desc.set_block_attr(name, val.desc) + elif isinstance(val, list) and all(isinstance(v, Block) for v in val): + self.desc.set_blocks_attr(name, [v.desc for v in val]) elif isinstance(val, core.BlockDesc) or \ isinstance(val, core.ProgramDesc): self.desc.set_serialized_attr(name, val.serialize_to_string()) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 391dddcf3e9c4f..676079144ebde4 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -396,7 +396,7 @@ def __op_have_grad_input__(op): return varname return "" - def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): + def __clone_lr_op_sub_block__(op, program, new_block): if not op.has_attr('sub_block'): return @@ -406,7 +406,6 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): # we put the new sub block to new block to follow the block # hierarchy of the original blocks new_sub_block = program.create_block(new_block.idx) - skip_sub_blks.append(new_sub_block.idx) # clone vars for var in origin_block.vars: @@ -416,8 +415,7 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): for op in origin_block.ops: self._clone_lr_op(program, new_sub_block, op) # clone sub_block of op - __clone_lr_op_sub_block__(op, program, new_sub_block, - skip_sub_blks) + __clone_lr_op_sub_block__(op, program, new_sub_block) # reset the block of op op.set_attr('sub_block', new_sub_block) @@ -433,8 +431,7 @@ def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op - __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block, - skip_sub_blks) + __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) # append op to the current block grad_to_block_id = [] From 54d13e298c1a2cf1f2d60fc55f8813bf4f324315 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 13:13:21 +0800 Subject: [PATCH 07/11] fix compile error --- paddle/fluid/operators/listen_and_serv_op.cc | 6 +++++- paddle/fluid/operators/send_recv_op_test.cc | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index a5bc5efaa623b3..2e9d2150c676d6 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -106,7 +106,11 @@ void ListenAndServOp::RunSyncLoop( PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); + std::vector optimize_blocks_idx; + for (auto blk : optimize_blocks) { + optimize_blocks_idx.push_back(blk->ID()); + } + auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( optimize_prepared.begin(), diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index e550552b195b76..aee6180add5708 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -129,7 +129,10 @@ void StartServerNet(bool is_sparse, std::atomic *initialized) { // sub program run in listen_and_serv_op, for simple test we use sum f::ProgramDesc program; const auto &root_block = program.Block(0); + std::vector optimize_blocks; auto *optimize_block = program.AppendBlock(root_block); + optimize_blocks.push_back(optimize_block); + auto *prefetch_block = program.AppendBlock(root_block); // X for server side tensors, RX for received tensors, must be of same shape. AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block, @@ -139,7 +142,7 @@ void StartServerNet(bool is_sparse, std::atomic *initialized) { attrs.insert({"Fanin", 1}); attrs.insert({"ParamList", std::vector({"Out"})}); attrs.insert({"GradList", std::vector({"x1"})}); - attrs.insert({"OptimizeBlock", optimize_block}); + attrs.insert({"optimize_blocks", optimize_blocks}); attrs.insert({"PrefetchBlock", prefetch_block}); attrs.insert({"grad_to_block_id", std::vector({""})}); attrs.insert({"sync_mode", true}); From 7e6e0c7b9fde53f0e5fbe8ef8484aef3bb944e6a Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 15:42:46 +0800 Subject: [PATCH 08/11] fix unit tests --- paddle/fluid/operators/listen_and_serv_op.cc | 2 +- python/paddle/fluid/framework.py | 22 +++++++++++--------- python/paddle/fluid/layers/io.py | 6 +++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 2e9d2150c676d6..d7e97e2a4d0770 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -260,7 +260,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, auto optimize_blocks = Attr>(kOptimizeBlocks); - PADDLE_ENFORCE(optimize_blocks.size() > 1, + PADDLE_ENFORCE(optimize_blocks.size() >= 1, "optimize blocks should be 1 at least on the pserver side."); auto *program = optimize_blocks[0]->Program(); framework::Executor executor(dev_place); diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 1843072662541a..9f307f6cb404f4 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -558,19 +558,20 @@ def find_name(var_list, name): if (attr_name not in self.attrs) or ( self.attrs[attr_name] is None): continue - if isinstance(self.attrs[attr_name], Block): + attr_val = self.attrs[attr_name] + if isinstance(attr_val, Block): self.desc.set_block_attr(attr_name, self.attrs[attr_name].desc) - elif isinstance(self.attrs[attr_name], list) and \ - all(isinstance(v, Block) for v in self.attrs[attr_name]): - self.desc.set_blocks_attr( - attr_name, [v.desc for v in self.attrs[attr_name]]) - elif isinstance(self.attrs[attr_name], core.BlockDesc) or \ - isinstance(self.attrs[attr_name], core.ProgramDesc): + elif isinstance(attr_val, list) and attr_val and \ + all(isinstance(v, Block) for v in attr_val): + self.desc.set_blocks_attr(attr_name, + [v.desc for v in attr_val]) + elif isinstance(attr_val, core.BlockDesc) or \ + isinstance(attr_val, core.ProgramDesc): self.desc.set_serialized_attr( - attr_name, self.attrs[attr_name].serialize_to_string()) + attr_name, attr_val.serialize_to_string()) else: - self.desc.set_attr(attr_name, self.attrs[attr_name]) + self.desc.set_attr(attr_name, attr_val) self.desc.check_attrs() if self.has_kernel(type): self.desc.infer_var_type(self.block.desc) @@ -719,7 +720,8 @@ def set_attr(self, name, val): self.attrs[name] = val if isinstance(val, Block): self.desc.set_block_attr(name, val.desc) - elif isinstance(val, list) and all(isinstance(v, Block) for v in val): + elif isinstance(val, list) and val and all( + isinstance(v, Block) for v in val): self.desc.set_blocks_attr(name, [v.desc for v in val]) elif isinstance(val, core.BlockDesc) or \ isinstance(val, core.ProgramDesc): diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 8d153b75cd4995..f3ab47c96b1caa 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -186,7 +186,6 @@ def complete_op(self): main_program = self.helper.main_program current_block = main_program.current_block() parent_block = self.parent_block() - empty_block = Program().global_block() parent_block.append_op( type='listen_and_serv', @@ -195,8 +194,9 @@ def complete_op(self): attrs={ 'endpoint': self.endpoint, 'Fanin': self.fan_in, - 'OptimizeBlock': current_block, - 'PrefetchBlock': empty_block, + 'optimize_blocks': [ + current_block + ], # did not support multiple optimize blocks in layers 'sync_mode': True, # did not support async now in layers 'grad_to_block_id': [""] }) From 2231cfe2d50d279d07de8e84cff49431aee917c0 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 17:05:03 +0800 Subject: [PATCH 09/11] add blocks attr type in proto --- paddle/fluid/framework/framework.proto | 1 + paddle/fluid/operators/listen_and_serv_op.cc | 3 ++- paddle/fluid/pybind/protobuf.cc | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index 8f73b3d478e6da..2cf14bd371831a 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -27,6 +27,7 @@ enum AttrType { BOOLEANS = 7; BLOCK = 8; LONG = 9; + BLOCKS = 10; } // OpDesc describes an instance of a C++ framework::OperatorBase diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index d7e97e2a4d0770..4ea2c3e0554c6b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -340,7 +340,8 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault({}); AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); AddAttr>( - kOptimizeBlocks, "Optimize blocks to run on server side."); + kOptimizeBlocks, "Optimize blocks to run on server side.") + .SetDefault({}); AddAttr>(kPrefetchVarNameToBlockId, "prefetch blocks to run on server side.") .SetDefault({}); diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index 2d44e1f63cbd77..fcd3356d44ee59 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -268,7 +268,8 @@ void BindOpDesc(pybind11::module *m) { .value("STRINGS", pd::proto::AttrType::STRINGS) .value("BOOL", pd::proto::AttrType::BOOLEAN) .value("BOOLS", pd::proto::AttrType::BOOLEANS) - .value("BLOCK", pd::proto::AttrType::BLOCK); + .value("BLOCK", pd::proto::AttrType::BLOCK) + .value("BLOCKS", pd::proto::AttrType::BLOCKS); pybind11::class_ op_desc(*m, "OpDesc", ""); op_desc From d01fb103b603e0afcf501e6af1a61a984ad711d6 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Sun, 24 Jun 2018 12:18:22 +0800 Subject: [PATCH 10/11] fix cloned op --- .../fluid/transpiler/distribute_transpiler.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 676079144ebde4..f003992f3c3e60 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -396,7 +396,7 @@ def __op_have_grad_input__(op): return varname return "" - def __clone_lr_op_sub_block__(op, program, new_block): + def __clone_lr_op_sub_block__(op, program, lr_block): if not op.has_attr('sub_block'): return @@ -405,17 +405,17 @@ def __clone_lr_op_sub_block__(op, program, new_block): assert isinstance(origin_block, Block) # we put the new sub block to new block to follow the block # hierarchy of the original blocks - new_sub_block = program.create_block(new_block.idx) + new_sub_block = program.create_block(lr_block.idx) # clone vars for var in origin_block.vars: new_sub_block.clone_variable(var) # clone ops - for op in origin_block.ops: - self._clone_lr_op(program, new_sub_block, op) + for origin_op in origin_block.ops: + cloned_op = self._clone_lr_op(program, new_sub_block, origin_op) # clone sub_block of op - __clone_lr_op_sub_block__(op, program, new_sub_block) + __clone_lr_op_sub_block__(cloned_op, program, new_sub_block) # reset the block of op op.set_attr('sub_block', new_sub_block) @@ -429,9 +429,10 @@ def __clone_lr_op_sub_block__(op, program, new_block): pserver_program.num_blocks - 1) optimize_blocks.append(lr_decay_block) for _, op in enumerate(lr_ops): - self._append_pserver_non_opt_ops(lr_decay_block, op) + cloned_op = self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op - __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) + __clone_lr_op_sub_block__(cloned_op, pserver_program, + lr_decay_block) # append op to the current block grad_to_block_id = [] @@ -1214,7 +1215,7 @@ def _clone_lr_op(self, program, block, op): if var not in program.global_block().vars: block.clone_variable(var) - block.append_op( + return block.append_op( type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs) def _append_pserver_non_opt_ops(self, optimize_block, opt_op): @@ -1252,7 +1253,7 @@ def _append_pserver_non_opt_ops(self, optimize_block, opt_op): elif not program.global_block().vars.has_key(var.name): program.global_block().clone_variable(var) - optimize_block.append_op( + return optimize_block.append_op( type=opt_op.type, inputs=inputs, outputs=outputs, From fa25aa92d8c327a0773e5e9b0e6775579ed5cea9 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 26 Jun 2018 17:26:27 +0800 Subject: [PATCH 11/11] revert tranpiler rst --- doc/fluid/api/transpiler.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/doc/fluid/api/transpiler.rst b/doc/fluid/api/transpiler.rst index 964ce22d4b0ce2..6b33e96b1c76c1 100644 --- a/doc/fluid/api/transpiler.rst +++ b/doc/fluid/api/transpiler.rst @@ -1,6 +1,11 @@ .. THIS FILE IS GENERATED BY `gen_doc.{py|sh}` !DO NOT EDIT THIS FILE MANUALLY! +================ +fluid.transpiler +================ + +.. _api_fluid_transpiler_DistributeTranspiler: DistributeTranspiler --------------------