From 1ab4fcb5e705c5e03c0fea3fbca3c00b5e67ff85 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 26 Mar 2018 19:55:46 +0800 Subject: [PATCH 1/3] prepare pserver executor --- paddle/fluid/framework/executor.cc | 15 +++++++++++++ paddle/fluid/framework/executor.h | 3 +++ paddle/fluid/operators/listen_and_serv_op.cc | 22 ++++++++++++-------- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 0b171e1dcfa90c..5279eb42cd5c1f 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -279,6 +279,21 @@ std::unique_ptr Executor::Prepare( return std::unique_ptr(ctx); } +std::vector> Prepare( + const ProgramDesc& program, const std::vector& block_ids) { + std::vector> result; + for (auto& bid : block_ids) { + auto* ctx = new ExecutorPrepareContext(program, bid); + PADDLE_ENFORCE_LT(static_cast(bid), program.Size()); + auto& block = program.Block(bid); + for (auto& op_desc : block.AllOps()) { + ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc)); + } + result.push_back(std::shared_ptr(ctx)); + } + return result; +} + void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, bool create_local_scope, bool create_vars) { auto& block = ctx->prog_.Block(ctx->block_id_); diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index d8dd82469af06a..756f3c7e5ad596 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -60,6 +60,9 @@ class Executor { static std::unique_ptr Prepare( const ProgramDesc& program, int block_id); + static std::vector> Prepare( + const ProgramDesc& program, const std::vector& block_ids); + void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, bool create_local_scope = true, bool create_vars = true); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 08b83375dd5462..6bae993f6127bb 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -93,6 +93,10 @@ class ListenAndServOp : public framework::OperatorBase { "server program should have at least 2 blocks"); framework::Executor executor(dev_place); + std::vector block_list; + for (int blkid = 1; blkid < num_blocks; ++blkid) + block_list.push_back(blkid); + auto prepared = executor.Prepare(*program, block_list); // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; @@ -143,11 +147,12 @@ class ListenAndServOp : public framework::OperatorBase { std::vector> fs; // block0 contains only listen_and_serv op, start run from block1. for (int blkid = 1; blkid < num_blocks - 1; ++blkid) { - fs.push_back( - framework::Async([&executor, &program, &recv_scope, blkid]() { + fs.push_back(framework::Async( + [&executor, &program, &recv_scope, &prepared, blkid]() { int run_block = blkid; // thread local try { - executor.Run(*program, &recv_scope, run_block, false, false); + executor.RunPreparedContext(prepared[run_block].get(), + &recv_scope, false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -157,7 +162,9 @@ class ListenAndServOp : public framework::OperatorBase { // Run global block at final step, or block1 if there are only 2 blocks if (num_blocks >= 2) { try { - executor.Run(*program, &recv_scope, num_blocks - 1, false, false); + // executor.Run(program, &recv_scope, num_blocks - 1, false, false); + executor.RunPreparedContext(prepared[num_blocks - 1].get(), + &recv_scope, false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -172,14 +179,11 @@ class ListenAndServOp : public framework::OperatorBase { var->GetMutable()->mutable_rows()->clear(); } rpc_service_->SetCond(1); - // FIXME(typhoonzero): use another condition to sync wait clients get. + // NOTE: does not consider barrier request retry in here, we may use + // global barrier id to resolve this. rpc_service_->WaitClientGet(fan_in); sparse_vars.clear(); } // while(true) - - // for (int i = 0; i < num_blocks; ++i) { - // delete blk_ctx_list[i]; - // } } protected: From 75bfdb3a3cfc526dcb5eb664ffcf2d20dd932c3c Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 2 Apr 2018 17:27:51 +0800 Subject: [PATCH 2/3] refine --- paddle/fluid/framework/executor.cc | 2 +- paddle/fluid/operators/listen_and_serv_op.cc | 42 ++++++++++++-------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 96d9b49c874ad7..16a118090ba9cf 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -279,7 +279,7 @@ std::unique_ptr Executor::Prepare( return std::unique_ptr(ctx); } -std::vector> Prepare( +std::vector> Executor::Prepare( const ProgramDesc& program, const std::vector& block_ids) { std::vector> result; for (auto& bid : block_ids) { diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 6094c066f94512..d4b0fa3aa18d61 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -54,20 +54,24 @@ static void CreateTensorFromMessageType(framework::Variable *var, } } -static void ParallelExecuteBlocks(const std::vector ¶llel_blkids, - framework::Executor *executor, - framework::ProgramDesc *program, - framework::Scope *scope) { +static void ParallelExecuteBlocks( + const std::vector ¶llel_blkids, framework::Executor *executor, + const std::vector> + &prepared, + framework::ProgramDesc *program, framework::Scope *scope) { std::vector> fs; for (size_t idx : parallel_blkids) { - fs.push_back(framework::Async([&executor, &program, &scope, idx]() { - int run_block = idx; // thread local - try { - executor->Run(*program, scope, run_block, false, false); - } catch (std::exception &e) { - LOG(ERROR) << "run sub program error " << e.what(); - } - })); + fs.push_back( + framework::Async([&executor, &prepared, &program, &scope, idx]() { + int run_block = idx; // thread local + try { + // executor->Run(*program, scope, run_block, false, false); + executor->RunPreparedContext(prepared[run_block].get(), scope, + false, false); + } catch (std::exception &e) { + LOG(ERROR) << "run sub program error " << e.what(); + } + })); } for (size_t i = 0; i < fs.size(); ++i) fs[i].wait(); } @@ -105,15 +109,18 @@ class ListenAndServOp : public framework::OperatorBase { auto *block = Attr(kOptimizeBlock); auto *program = block->Program(); - int num_blocks = program->Size(); + size_t num_blocks = program->Size(); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); framework::Executor executor(dev_place); std::vector block_list; - for (int blkid = 1; blkid < num_blocks; ++blkid) + for (size_t blkid = 1; blkid < num_blocks; ++blkid) block_list.push_back(blkid); auto prepared = executor.Prepare(*program, block_list); + prepared.insert( + prepared.begin(), + std::shared_ptr(nullptr)); // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; @@ -161,21 +168,22 @@ class ListenAndServOp : public framework::OperatorBase { // The optimize blocks which have the same parent ID would run parallel // TODO(Yancey1989): need to use ParallelExecutor for future - size_t last_parent_blkid = program->Block(1).Parent(); + int32_t last_parent_blkid = program->Block(1).Parent(); std::vector parallel_blkids; parallel_blkids.push_back(1); double ts = detail::GetTimestamp(); for (size_t blkid = 2; blkid < num_blocks; ++blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) { for (size_t idx : parallel_blkids) VLOG(3) << idx; - ParallelExecuteBlocks(parallel_blkids, &executor, program, + ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program, &recv_scope); parallel_blkids.clear(); last_parent_blkid = program->Block(blkid).Parent(); } parallel_blkids.push_back(blkid); } - ParallelExecuteBlocks(parallel_blkids, &executor, program, &recv_scope); + ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program, + &recv_scope); VLOG(2) << "run all blocks spent (ms) " << detail::GetTimestamp() - ts; From 44c29abdbf76ddaa8c0fb9fc4a060f034b8d4a13 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 3 Apr 2018 17:40:47 +0800 Subject: [PATCH 3/3] remove comments --- paddle/fluid/operators/listen_and_serv_op.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index d4b0fa3aa18d61..11eab6f78f9834 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -65,7 +65,6 @@ static void ParallelExecuteBlocks( framework::Async([&executor, &prepared, &program, &scope, idx]() { int run_block = idx; // thread local try { - // executor->Run(*program, scope, run_block, false, false); executor->RunPreparedContext(prepared[run_block].get(), scope, false, false); } catch (std::exception &e) {