Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update record interface using part1 #39693

Merged
Merged
23 changes: 16 additions & 7 deletions paddle/fluid/distributed/ps/service/brpc_ps_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ void BrpcPsService::service(google::protobuf::RpcController *cntl_base,
int32_t BrpcPsService::pull_dense(Table *table, const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->pull_dense");
platform::RecordEvent record_event(
"PsService->pull_dense", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
if (request.params_size() < 1) {
set_response_code(
Expand Down Expand Up @@ -219,7 +220,9 @@ int32_t BrpcPsService::push_dense_param(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_dense_param");
platform::RecordEvent record_event("PsService->push_dense_param",
platform::TracerEventType::Communication,
1);
CHECK_TABLE_EXIST(table, request, response)
thread_local std::string push_buffer;
auto &req_io_buffer = cntl->request_attachment();
Expand All @@ -245,7 +248,8 @@ int32_t BrpcPsService::push_dense_param(Table *table,
int32_t BrpcPsService::push_dense(Table *table, const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_dense");
platform::RecordEvent record_event(
"PsService->push_dense", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
auto req_buffer_size = request.data().size();
if (req_buffer_size < 1) {
Expand Down Expand Up @@ -291,7 +295,9 @@ int32_t BrpcPsService::push_sparse_param(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_sparse_param");
platform::RecordEvent record_event("PsService->push_sparse_param",
platform::TracerEventType::Communication,
1);
CHECK_TABLE_EXIST(table, request, response)
auto &push_data = request.data();
if (push_data.size() < 1) {
Expand Down Expand Up @@ -323,7 +329,8 @@ int32_t BrpcPsService::pull_geo_param(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->pull_geo_param");
platform::RecordEvent record_event(
"PsService->pull_geo_param", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
thread_local std::string push_sparse_request_buffer;

Expand All @@ -346,7 +353,8 @@ int32_t BrpcPsService::pull_sparse(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->pull_sparse");
platform::RecordEvent record_event(
"PsService->pull_sparse", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)

auto &req_io_buffer = cntl->request_attachment();
Expand Down Expand Up @@ -392,7 +400,8 @@ int32_t BrpcPsService::push_sparse(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_sparse");
platform::RecordEvent record_event(
"PsService->push_sparse", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
auto &push_data = request.data();
if (push_data.size() < 1) {
Expand Down
51 changes: 38 additions & 13 deletions paddle/fluid/distributed/ps/service/communicator/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ int Communicator::SetClients(std::vector<uint64_t> &host_sign_list) {

void Communicator::RpcRecvDense(const std::vector<std::string> &varnames,
int table_id, Scope *scope) {
platform::RecordEvent record_event("Communicator->RpcRecvDense");
platform::RecordEvent record_event("Communicator->RpcRecvDense",
platform::TracerEventType::Communication,
1);
std::vector<paddle::distributed::Region> regions;
regions.reserve(varnames.size());
for (auto &t : varnames) {
Expand Down Expand Up @@ -169,7 +171,9 @@ void Communicator::RpcRecvDense(const std::vector<std::string> &varnames,

void Communicator::RpcSendDenseParam(const std::vector<std::string> &varnames,
int table_id, const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendDenseParam");
platform::RecordEvent record_event("Communicator->RpcSendDenseParam",
platform::TracerEventType::Communication,
1);
auto place = platform::CPUPlace();
std::vector<paddle::distributed::Region> regions;
for (auto &t : varnames) {
Expand Down Expand Up @@ -206,7 +210,9 @@ void Communicator::RpcSendDenseParam(const std::vector<std::string> &varnames,
}

void Communicator::RpcSendDense(const CommContext &ctx, const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendDense");
platform::RecordEvent record_event("Communicator->RpcSendDense",
platform::TracerEventType::Communication,
1);
auto &var_names = ctx.origin_varnames;
auto &table_id = ctx.table_id;
auto dense_data = std::make_shared<std::vector<float>>();
Expand Down Expand Up @@ -250,7 +256,9 @@ void Communicator::RpcSendDense(const CommContext &ctx, const Scope &scope) {

void Communicator::RpcSendSparseParam(const std::string &varname, int table_id,
const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendSparseParam");
platform::RecordEvent record_event("Communicator->RpcSendSparseParam",
platform::TracerEventType::Communication,
1);
size_t request_call_num = _worker_ptr->get_server_nums();
std::vector<float *> push_g_vec;

Expand Down Expand Up @@ -287,7 +295,9 @@ void Communicator::RpcSendSparseParam(const std::string &varname, int table_id,

void Communicator::RpcSendSparse(const std::string &var_name, int table_id,
const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendSparse");
platform::RecordEvent record_event("Communicator->RpcSendSparse",
platform::TracerEventType::Communication,
1);
size_t request_call_num = _worker_ptr->get_server_nums();
std::vector<uint64_t> sparse_push_keys;
std::vector<float *> push_g_vec;
Expand Down Expand Up @@ -338,7 +348,9 @@ void Communicator::RpcSendSparse(const std::string &var_name, int table_id,

void Communicator::RpcRecvSparse(const std::string &varname, int table_id,
Scope *scope) {
platform::RecordEvent record_event("Communicator->RpcRecvSparse");
platform::RecordEvent record_event("Communicator->RpcRecvSparse",
platform::TracerEventType::Communication,
1);
auto *send_var = scope->Var(varname);
auto *tensor = send_var->GetMutable<framework::LoDTensor>();
auto dim = tensor->dims()[1];
Expand Down Expand Up @@ -406,7 +418,9 @@ void Communicator::SendGlobalStep(const CommContext &ctx, int batches,
if (batches == 0) {
return;
}
platform::RecordEvent record_event("Communicator->SendGlobalStep");
platform::RecordEvent record_event("Communicator->SendGlobalStep",
platform::TracerEventType::Communication,
1);
auto &table_id = ctx.table_id;
size_t request_call_num = _worker_ptr->get_server_nums();

Expand Down Expand Up @@ -994,7 +1008,8 @@ void SyncCommunicator::BarrierRecv() {

void GeoCommunicator::Send(const std::vector<std::string> &var_names,
const framework::Scope &scope) {
platform::RecordEvent record_event("GeoCommunicator->Send");
platform::RecordEvent record_event(
"GeoCommunicator->Send", platform::TracerEventType::Communication, 1);
waiting_ = false;
auto before_send = GetCurrentUS();
auto table_name = var_names[0];
Expand Down Expand Up @@ -1137,7 +1152,9 @@ void GeoCommunicator::InitDense(std::vector<std::string> &varnames,
}

void GeoCommunicator::SendDense(const CommContext &send_ctx) {
platform::RecordEvent record_event("GeoCommunicator->SendDense");
platform::RecordEvent record_event("GeoCommunicator->SendDense",
platform::TracerEventType::Communication,
1);
auto &var_names = send_ctx.origin_varnames;
auto &table_id = send_ctx.table_id;
for (auto &varname : var_names) {
Expand Down Expand Up @@ -1177,7 +1194,9 @@ void GeoCommunicator::SendDense(const CommContext &send_ctx) {
}

void GeoCommunicator::RecvDense(const CommContext &send_ctx) {
platform::RecordEvent record_event("GeoCommunicator->RecvDense");
platform::RecordEvent record_event("GeoCommunicator->RecvDense",
platform::TracerEventType::Communication,
1);
auto &table_id = send_ctx.table_id;
auto &varnames = recv_varname_to_ctx_.at(table_id);
// 1. recv from pserver
Expand Down Expand Up @@ -1235,7 +1254,9 @@ void GeoCommunicator::InitSparse(const std::string &var_name, int table_id) {

std::vector<int64_t> GeoCommunicator::MergeSparseIds(
const std::string &send_varname) {
platform::RecordEvent record_event("GeoCommunicator->MergeSparseIds");
platform::RecordEvent record_event("GeoCommunicator->MergeSparseIds",
platform::TracerEventType::Communication,
1);
size_t merge_num = 0, wait_times = 0;
std::unordered_set<int64_t> sparse_ids;
while (merge_num < static_cast<size_t>(max_merge_var_num_)) {
Expand Down Expand Up @@ -1267,7 +1288,9 @@ std::vector<int64_t> GeoCommunicator::MergeSparseIds(
void GeoCommunicator::SendSparse(const std::string &varname,
std::vector<int64_t> &sparse_ids, int table_id,
int ep_idx) {
platform::RecordEvent record_event("GeoCommunicator->SendSparse");
platform::RecordEvent record_event("GeoCommunicator->SendSparse",
platform::TracerEventType::Communication,
1);
if (sparse_ids.size() == 0) {
return;
}
Expand Down Expand Up @@ -1342,7 +1365,9 @@ void GeoCommunicator::SendSparse(const std::string &varname,

void GeoCommunicator::RecvSparse(const std::string &varname, int table_id,
int ep_idx) {
platform::RecordEvent record_event("GeoCommunicator->RecvSparse");
platform::RecordEvent record_event("GeoCommunicator->RecvSparse",
platform::TracerEventType::Communication,
1);
// 1. recv from pserver
std::vector<uint64_t> keys;
std::vector<float> values;
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/ps/service/heter_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#include "paddle/fluid/distributed/ps/service/heter_client.h"

#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/string/split.h"
Expand Down Expand Up @@ -152,7 +151,9 @@ void HeterClient::SendAndRecvAsync(
const std::string& message_name,
const std::vector<std::string>& send_var_name,
const std::vector<std::string>& recv_var_name, const std::string& mode) {
platform::RecordEvent record_event("HeterClient->SendAndRecvAsync");
platform::RecordEvent record_event("HeterClient->SendAndRecvAsync",
platform::TracerEventType::Communication,
1);
const platform::DeviceContext* p_ctx = &ctx;
const framework::Scope* p_scope = &scope;
const std::string message_name_val = message_name;
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/distributed/ps/service/heter_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ class RequestSendAndRecvHandler final : public HeterRequestHandler {

int Handle(const MultiVarMsg* request, MultiVarMsg* response,
brpc::Controller* cntl) override {
platform::RecordEvent record_event("RequestSendAndRecvHandler->Handle");
platform::RecordEvent record_event("RequestSendAndRecvHandler->Handle",
platform::TracerEventType::Communication,
1);
FLAGS_eager_delete_tensor_gb = -1;

// get microID from request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "iomanip"
#include "paddle/fluid/distributed/ps/table/table.h"
#include "paddle/fluid/framework/archive.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace distributed {
std::vector<std::string> GraphPyService::split(std::string& str,
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/framework/details/all_reduce_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
DECLARE_bool(sync_nccl_allreduce);
Expand Down Expand Up @@ -68,8 +68,8 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
#endif

void AllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());

platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
WaitInputVarGenerated();
std::vector<VarHandleBase *> inputs = this->Inputs();
std::vector<VarHandleBase *> outputs = this->Outputs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"

#if defined(PADDLE_WITH_XPU)
namespace paddle {
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/framework/details/broadcast_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"

namespace paddle {
namespace framework {
namespace details {

void BroadcastOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());

platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
if (places_.size() == 1) return;

// The input and output may have dummy vars.
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/framework/details/eager_deletion_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "paddle/fluid/framework/details/eager_deletion_op_handle.h"

#include "paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include "paddle/fluid/platform/cuda_device_guard.h"
#endif
Expand Down Expand Up @@ -128,7 +128,8 @@ void EagerDeletionOpHandle::RunImpl() {
CallOnce();
}

platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(Name(),
platform::TracerEventType::UserDefined, 2);
std::deque<std::shared_ptr<memory::Allocation>> garbages;
for (size_t i = 0; i < var_infos_.size(); ++i) {
auto *var_info = var_infos_[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "paddle/fluid/framework/details/fetch_async_op_handle.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -65,7 +65,8 @@ FetchResultType FastThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors, bool return_merged) {
VLOG(3) << "enter FastThreadedSSAGraphExecutor Run";
std::unique_ptr<platform::RecordEvent> event(
new platform::RecordEvent("FastThreadedSSAGraphExecutorPrepare"));
new platform::RecordEvent("FastThreadedSSAGraphExecutorPrepare",
platform::TracerEventType::UserDefined, 2));
std::unique_ptr<std::unordered_map<OpHandleBase *, std::atomic<int>>>
op_deps = atomic_op_deps_.get();
PrepareAtomicOpDeps();
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/details/fetch_async_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -190,7 +191,8 @@ void FetchAsyncOpHandle::FetchMergedLodTensor(
}

void FetchAsyncOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(Name(),
platform::TracerEventType::Operator, 1);
WaitInputVarGenerated(true);

// get src vars
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/framework/details/fetch_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <string>

#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -128,7 +128,8 @@ static void TransData(const framework::LoDTensor &src_item,
}

void FetchOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(Name(),
platform::TracerEventType::Operator, 1);
WaitInputVarGenerated(platform::CPUPlace());

tensors_.resize(inputs_.size());
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/framework/details/fused_all_reduce_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/platform/device_memory_aligment.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"

DEFINE_bool(skip_fused_all_reduce_check, false, "");
DECLARE_bool(allreduce_record_one_event);
Expand Down Expand Up @@ -68,7 +68,8 @@ FusedAllReduceOpHandle::~FusedAllReduceOpHandle() {
}

void FusedAllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
VLOG(4) << this->DebugString();

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
Expand Down
Loading