Skip to content

Commit

Permalink
mod communicator
Browse files Browse the repository at this point in the history
  • Loading branch information
yaoxuefeng6 committed Jan 20, 2022
1 parent c0f2728 commit 3a3c9c9
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 19 deletions.
177 changes: 177 additions & 0 deletions paddle/fluid/distributed/service/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ namespace distributed {
using framework::LoDTensor;
using framework::SelectedRows;

const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100;

inline double GetCurrentUS() {
struct timeval time;
gettimeofday(&time, NULL);
Expand Down Expand Up @@ -576,6 +578,181 @@ void AsyncCommunicator::MainThread() {
VLOG(1) << "communicator stopped, send thread exit";
}

void AsyncCommunicator::PullSparseToTensorSync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, bool is_training,
std::vector<const LoDTensor *> *inputs, std::vector<LoDTensor *> *outputs) {
std::vector<uint64_t> fea_keys;
std::vector<float *> pull_result_ptr;
fea_keys.reserve(MAX_FEASIGN_NUM / 100);
pull_result_ptr.reserve(MAX_FEASIGN_NUM / 100);
std::vector<float> init_value(fea_dim, 0);
framework::LoDTensor *output = nullptr;
float *output_data = nullptr;
size_t output_index = -1;
size_t output_len = 0;
for (size_t index = 0; index < inputs->size(); ++index) {
const framework::LoDTensor *tensor = inputs->at(index);
const int64_t *ids = tensor->data<int64_t>();
size_t len = tensor->numel();
for (size_t i = 0; i < len; ++i, output_len += fea_dim) {
if (!output || output_len == size_t(output->numel())) {
++output_index;
CHECK(output_index < outputs->size()); // NOLINT
output = outputs->at(output_index);
output->set_lod(tensor->lod());
output_data = output->mutable_data<float>(place);
output_len = 0;
CHECK(output->numel() % fea_dim == 0); // NOLINT
CHECK(output_data != nullptr); // NOLINT
}
uint64_t real_id = static_cast<uint64_t>(ids[i]);
if (real_id == padding_id) {
memcpy(output_data + output_len, init_value.data(),
sizeof(float) * fea_dim);
continue;
}
fea_keys.push_back(real_id);
pull_result_ptr.push_back(output_data + output_len);
}
}
auto status =
_worker_ptr->pull_sparse(pull_result_ptr.data(), table_id,
fea_keys.data(), fea_keys.size(), is_training);
status.wait();
auto ret = status.get();
if (ret != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << ret << "]";
sleep(sleep_seconds_before_fail_exit_);
}
}

void AsyncCommunicator::PushSparseFromTensorAsync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, std::vector<const framework::LoDTensor *> *inputs,
const framework::LoDTensor *shows, const framework::LoDTensor *clks,
std::vector<framework::LoDTensor *> *outputs) {
int batch_size = -1;
bool batch_size_consist = true;
for (auto *input : *inputs) {
int cur_batch_size =
input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0];
if (batch_size == -1) {
batch_size = cur_batch_size;
} else {
// CHECK(batch_size == cur_batch_size); // NOLINT
batch_size_consist = false;
break;
}
}
CHECK(batch_size > 0); // NOLINT

int show_size =
shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];
CHECK(show_size == batch_size || show_size == 1);
int clk_size =
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);

CHECK(outputs->size() == inputs->size());
std::vector<uint64_t> push_keys;
push_keys.reserve(MAX_FEASIGN_NUM / 100);
std::vector<std::vector<float>> push_values;
push_values.reserve(MAX_FEASIGN_NUM / 100);
size_t output_len = 0;
size_t input_idx = 0;

VLOG(2) << "fleet.cc::emb_dim: " << fea_dim;

// TODO(zhaocaibei123): check type of show/clk is int? float? uint64?
// const long int* show_tensor = shows->data<int64_t>();
// const long int* clk_tensor = clks->data<int64_t>();
const int64_t *show_tensor = shows->data<int64_t>();
const int64_t *clk_tensor = clks->data<int64_t>();

for (size_t index = 0; index < inputs->size(); ++index) {
framework::LoDTensor *g_tensor = outputs->at(index);
float *g = g_tensor->data<float>();
// no cvm
if (batch_size_consist) { // TODO(zhaocaibei123): add config
// scale_sparse_gradient_with_batch_size_
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
g_mat(g, g_tensor->numel() / fea_dim, fea_dim);
g_mat.rightCols(fea_dim) *= batch_size;
}

const framework::LoDTensor *tensor = inputs->at(index);
const int64_t *ids = tensor->data<int64_t>();
size_t len = tensor->numel();
output_len = 0;

if (tensor->lod().size() > 0) {
for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) {
for (int j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1];
++j, output_len += fea_dim) {
uint64_t real_id = static_cast<uint64_t>(ids[j]);
if (real_id == padding_id) {
continue;
}
push_keys.emplace_back(real_id);
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));

float *data = push_values.back().data() + 3;

memcpy(data, g + output_len, sizeof(float) * fea_dim);

++input_idx;
}
}
} else {
for (size_t i = 0; i < len; ++i, output_len += fea_dim) {
uint64_t real_id = static_cast<uint64_t>(ids[i]);
if (real_id == padding_id) {
continue;
}
push_keys.emplace_back(real_id);
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));

float *data = push_values.back().data() + 3;

memcpy(data, g + output_len, sizeof(float) * fea_dim);

++input_idx;
}
}
CHECK(output_len == g_tensor->numel());
}

std::vector<float *> push_g_vec(input_idx, nullptr);

for (auto i = 0u; i < push_keys.size(); ++i) {
push_g_vec[i] = push_values.at(i).data();
}

PADDLE_ENFORCE_EQ(
this->Check(table_id), true,
platform::errors::InvalidArgument(
"can not find table: %s, please check your config", table_id));
auto status = _worker_ptr->push_sparse(table_id, push_keys.data(),
(const float **)push_g_vec.data(),
push_keys.size());
}

void HalfAsyncCommunicator::MainThread() {
VLOG(3) << "HalfAsyncCommunicator MainThread start and wait";

Expand Down
13 changes: 13 additions & 0 deletions paddle/fluid/distributed/service/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,18 @@ class AsyncCommunicator : public Communicator {

void PushDensePostProcessing();

void PullSparseToTensorSync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, bool is_training,
std::vector<const framework::LoDTensor *> *inputs, // NOLINT
std::vector<framework::LoDTensor *> *outputs); // NOLINT

void PushSparseFromTensorAsync(
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, std::vector<const framework::LoDTensor *> *inputs,
const framework::LoDTensor *shows, const framework::LoDTensor *clicks,
std::vector<framework::LoDTensor *> *outputs);

protected:
std::unordered_map<std::string,
std::shared_ptr<BlockingQueue<std::shared_ptr<Variable>>>>
Expand All @@ -467,6 +479,7 @@ class AsyncCommunicator : public Communicator {
bool need_global_step_ = false;
bool independent_recv_ = true;
int parallel_task_nums_ = 0;
int32_t sleep_seconds_before_fail_exit_;

std::unique_ptr<std::thread> main_thread_{nullptr};
std::unique_ptr<std::thread> recv_thread_{nullptr};
Expand Down
21 changes: 12 additions & 9 deletions paddle/fluid/operators/pscore/distributed_lookup_table_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <string>
#include <vector>
#include "paddle/fluid/distributed/fleet.h"
#include "paddle/fluid/distributed/service/communicator.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/tensor_util.h"
Expand Down Expand Up @@ -51,13 +52,15 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {
auto inputs = context.MultiInput<framework::LoDTensor>("Ids");
auto outputs = context.MultiOutput<framework::LoDTensor>("Outputs");

auto fleet = distributed::FleetWrapper::GetInstance();
// auto fleet = distributed::FleetWrapper::GetInstance();
auto *communicator = (distributed::AsyncCommunicator *)
distributed::Communicator::GetInstance();

if (platform::is_cpu_place(context.GetPlace())) {
fleet->PullSparseToTensorSync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
context.GetPlace(), !is_test, &inputs,
&outputs);
communicator->PullSparseToTensorSync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), context.GetPlace(), !is_test,
&inputs, &outputs);
} else {
auto inputs_variable = context.MultiInputVar("Ids");
auto outputs_variable = context.MultiOutputVar("Outputs");
Expand Down Expand Up @@ -93,10 +96,10 @@ class DistributedLookupTableKernel : public framework::OpKernel<T> {
}

// use fleet->PullSparse
fleet->PullSparseToTensorSync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
cpu_place, !is_test, &tmp_input_vec,
&tmp_output_vec);
communicator->PullSparseToTensorSync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), cpu_place, !is_test,
&tmp_input_vec, &tmp_output_vec);

// cp temp to origin
for (size_t idx = 0; idx < output_var_size; ++idx) {
Expand Down
33 changes: 23 additions & 10 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <string>
#include <vector>
#include "paddle/fluid/distributed/fleet.h"
#include "paddle/fluid/distributed/service/communicator.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/tensor_util.h"
Expand All @@ -32,20 +33,21 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
auto table_id = context.Attr<int>("table_id");
auto emb_dim = context.Attr<int>("size");
VLOG(1) << "push_sparse.h::emb_dim: " << emb_dim;
bool is_test = context.Attr<bool>("is_test");

auto inputs = context.MultiInput<framework::LoDTensor>("Ids");
auto shows = context.Input<framework::LoDTensor>("Shows");
auto clks = context.Input<framework::LoDTensor>("Clicks");
auto outputs = context.MultiOutput<framework::LoDTensor>("Outputs");

auto fleet = distributed::FleetWrapper::GetInstance();
// auto fleet = distributed::FleetWrapper::GetInstance();
auto *communicator = (distributed::AsyncCommunicator *)
distributed::Communicator::GetInstance();

if (platform::is_cpu_place(context.GetPlace())) {
fleet->PushSparseFromTensorAsync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
context.GetPlace(), &inputs, shows, clks,
&outputs);
communicator->PushSparseFromTensorAsync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), context.GetPlace(), &inputs,
shows, clks, &outputs);
} else {
auto inputs_variable = context.MultiInputVar("Ids");
auto outputs_variable = context.MultiOutputVar("Outputs");
Expand All @@ -71,6 +73,17 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
tmp_input_vec.push_back(tmp_input_tensor);
}

framework::Variable *tmp_shows_var = tmp_scope->Var("Shows");
framework::LoDTensor *tmp_shows_tensor =
tmp_shows_var->GetMutable<framework::LoDTensor>();
framework::Variable *tmp_clicks_var = tmp_scope->Var("Clicks");
framework::LoDTensor *tmp_clicks_tensor =
tmp_clicks_var->GetMutable<framework::LoDTensor>();
framework::TensorCopy(*shows, cpu_place, context.device_context(),
tmp_shows_tensor);
framework::TensorCopy(*clks, cpu_place, context.device_context(),
tmp_clicks_tensor);

// create temp output
for (size_t idx = 0; idx < output_var_size; ++idx) {
framework::Variable *tmp_output_var = tmp_scope->Var(outputs_name[idx]);
Expand All @@ -81,10 +94,10 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
}

// use fleet->PullSparse
fleet->PullSparseToTensorSync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
cpu_place, !is_test, &tmp_input_vec,
&tmp_output_vec);
communicator->PushSparseFromTensorAsync(
static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx), context.GetPlace(),
&tmp_input_vec, tmp_shows_tensor, tmp_clicks_tensor, &tmp_output_vec);

// cp temp to origin
for (size_t idx = 0; idx < output_var_size; ++idx) {
Expand Down

1 comment on commit 3a3c9c9

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.