Skip to content

Commit

Permalink
Merge pull request #7715 from Yancey1989/split_selected_rows_to_multi…
Browse files Browse the repository at this point in the history
…_pserver

[WIP] Split SelectedRows to multiple pservers
  • Loading branch information
typhoonzero authored Jan 22, 2018
2 parents 85671b8 + d0a9393 commit 1861562
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 63 deletions.
2 changes: 1 addition & 1 deletion paddle/operators/recv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class RecvOp : public framework::OperatorBase {

// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
int64_t barrier_size = param_count * fan_in;
size_t barrier_size = param_count * fan_in;
while (!exit_flag) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
Expand Down
21 changes: 7 additions & 14 deletions paddle/operators/split_selected_rows_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker {
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "The input SelectedRows.");
AddOutput("Out", "The outputs of input SelectedRows.").AsDuplicable();
AddAttr<std::vector<int>>("rows_sections", "Rows section for output.")
.SetDefault(std::vector<int>({}));
AddAttr<std::vector<int>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int>({}));
Expand All @@ -35,16 +33,16 @@ height_sections is only needed when need to split the dims of the original tenso
Example:
Input:
X.rows = {0, 7, 5}
X.rows = {7, 5}
X.height = 12
Attr:
rows_sections = {1, 2}
height_sections = {}
height_sections = {4, 8}
Out:
out0.rows = {0}
out0.height = 12
out1.rows = {7, 5}
out2.height = 12
out0.rows = {}
out0.height = 4
out1.rows = {5, 7}
out2.height = 8
)DOC");
}
Expand All @@ -61,11 +59,6 @@ class SplitSelectedRowsOp : public framework::OperatorWithKernel {

std::vector<int> height_sections =
ctx->Attrs().Get<std::vector<int>>("height_sections");
std::vector<int> rows_sections =
ctx->Attrs().Get<std::vector<int>>("rows_sections");
PADDLE_ENFORCE_EQ(
rows_sections.size(), ctx->Outputs("Out").size(),
"The size of rows section should be the same with Outputs size.");
int64_t n = ctx->Outputs("Out").size();

std::vector<framework::DDim> outs_dims;
Expand Down
68 changes: 49 additions & 19 deletions paddle/operators/split_selected_rows_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,70 @@ limitations under the License. */

#include <vector>
#include "paddle/framework/op_registry.h"
#include "paddle/operators/math/selected_rows_functor.h"

namespace paddle {
namespace operators {

static int FindOutIdx(int row, const std::vector<int>& height_sections) {
int offset = 0;
for (size_t i = 0; i < height_sections.size(); ++i) {
if (row >= offset && row < (offset + height_sections[i])) {
return i;
}
offset += height_sections[i];
}
return -1;
}

template <typename DeviceContext, typename T>
class SplitSelectedRowsOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto* x = ctx.Input<framework::SelectedRows>("X");
auto outs = ctx.MultiOutput<framework::SelectedRows>("Out");

auto rows_sections = ctx.Attr<std::vector<int>>("rows_sections");
auto height_sections = ctx.Attr<std::vector<int>>("height_sections");

int64_t n = outs.size();
int offset = 0;
auto x_rows = x->rows();
std::vector<std::vector<int>> outs_rows_idx;
outs_rows_idx.resize(outs.size());

for (int64_t i = 0; i < n; ++i) {
framework::Vector<int64_t> out_rows;
for (int64_t j = 0; j < rows_sections[i]; ++j) {
out_rows.push_back(x->rows()[offset + j]);
}
auto row_numel = x->value().numel() / x->value().dims()[0];
auto src = x->value().data<T>();

for (size_t i = 0; i < x_rows.size(); ++i) {
int out_idx = FindOutIdx(x_rows[i], height_sections);
outs_rows_idx[out_idx].push_back(i);
}
auto place = ctx.GetPlace();

auto& out = outs[i];
auto x_dims = x->GetCompleteDims();
x_dims[0] = rows_sections[i];
out->mutable_value()->mutable_data<T>(x_dims, ctx.GetPlace());
framework::Copy(x->value().Slice(offset, rows_sections[i] + offset),
x->place(), ctx.device_context(), out->mutable_value());
outs[i]->set_rows(out_rows);
if (height_sections.size()) {
outs[i]->set_height(height_sections[i]);
for (size_t i = 0; i < outs_rows_idx.size(); ++i) {
auto rows_idx = outs_rows_idx[i];
if (rows_idx.size() > 0) {
auto dims = x->GetCompleteDims();
dims[0] = rows_idx.size();
outs[i]->mutable_value()->mutable_data<T>(dims, x->place());
for (auto idx : rows_idx) {
outs[i]->mutable_rows()->push_back(x_rows[idx]);
}
auto dst = outs[i]->mutable_value()->mutable_data<T>(ctx.GetPlace());
for (size_t j = 0; j < rows_idx.size(); j++) {
if (platform::is_cpu_place(place)) {
memory::Copy(platform::CPUPlace(), dst + j * row_numel,
platform::CPUPlace(), src + rows_idx[j] * row_numel,
sizeof(T) * row_numel);
} else {
#ifdef PADDLE_WITH_CUDA
auto stream = ctx.cuda_device_context().stream();
memory::Copy(platform::CUDAPlace(), dst + j * row_numel,
platform::CUDAPlace(), src + rows_idx[j] * row_numel,
sizeof(T) * row_numel, stream);
#else
PADDLE_THROW("Paddle is not compiled with GPU");
#endif
}
}
}
offset += rows_sections[i];
}
}
};
Expand Down
32 changes: 23 additions & 9 deletions python/paddle/v2/fluid/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from layer_helper import LayerHelper
from distributed_spliter import *
import math
from . import core


class VarBlock:
Expand Down Expand Up @@ -217,15 +218,28 @@ def _append_split_op(self, program, gradblocks):
if len(splited_vars) <= 1:
continue
orig_var = program.global_block().vars[varname]
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block().append_op(
type="split",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"sections": sections} # assume split evenly
)
if orig_var == core.VarDesc.VarType.SELECTED_ROWS:
height_sections = []
for v in splited_vars:
height_sections.append(v.shape[0])
program.global_block().append_op(
type="split_selected_rows",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"height_sections": height_sections})
elif orig_var == core.VarDesc.VarType.LOD_TENSOR:
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block().append_op(
type="split",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"sections": sections} # assume split evenly
)
else:
AssertionError("Variable type should be in set "
"[LOD_TENSOR, SELECTED_ROWS]")
return var_mapping

def get_trainer_program(self):
Expand Down
41 changes: 21 additions & 20 deletions python/paddle/v2/fluid/tests/test_split_selected_rows_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def test_check_grad(self):

def check_with_place(self, place):
scope = core.Scope()
rows = [0, 5, 7, 4]
height = 10
rows = [0, 5, 7, 4, 20]
height = 20
row_numel = 2

# initialize input variable X
Expand All @@ -46,47 +46,49 @@ def check_with_place(self, place):
np_array = np.ones((len(rows), row_numel)).astype("float32")
np_array[0, 0] = 2.0
np_array[2, 1] = 4.0
np_array[4, 1] = 8.0
x_tensor = x.get_tensor()
x_tensor.set(np_array, place)

rows_sections = [2, 2]
height_sections = []
height_sections = [5, 5, 5, 5, 3]

# initialize output variables [out0, out1]
out0 = scope.var('out0').get_selected_rows()
out1 = scope.var('out1').get_selected_rows()
outs_name = ["out%d" % i for i in xrange(len(height_sections))]
outs = [
scope.var(var_name).get_selected_rows() for var_name in outs_name
]

# expected output selected rows
expected_out0_rows = [0, 5]
expected_out1_rows = [7, 4]
expected_height = height
expected_out0_rows = [0, 4]
expected_out1_rows = [5, 7]
expected_out4_rows = [20]

op = Operator(
"split_selected_rows",
X="X",
Out=["out0", "out1"],
rows_sections=rows_sections,
Out=outs_name,
height_sections=height_sections)

op.run(scope, place)

self.assertEqual(out0.rows(), expected_out0_rows)
self.assertEqual(out1.rows(), expected_out1_rows)
self.assertEqual(outs[0].rows(), expected_out0_rows)
self.assertEqual(outs[1].rows(), expected_out1_rows)
self.assertEqual(outs[4].rows(), expected_out4_rows)

self.assertEqual(out0.height(), expected_height)
self.assertEqual(out1.height(), expected_height)
self.assertEqual(outs[0].height(), height_sections[0])
self.assertEqual(outs[4].height(), height_sections[4])

self.assertAlmostEqual(2.0, np.array(out0.get_tensor())[0, 0])
self.assertAlmostEqual(4.0, np.array(out1.get_tensor())[0, 1])
self.assertAlmostEqual(2.0, np.array(outs[0].get_tensor())[0, 0])
self.assertAlmostEqual(4.0, np.array(outs[1].get_tensor())[1, 1])
self.assertAlmostEqual(8.0, np.array(outs[4].get_tensor())[0, 1])

def check_grad_with_place(self, place):
scope = core.Scope()
height = 10
row_numel = 2

# attr
rows_sections = [2, 2]
height_sections = []
height_sections = [5, 5]

# initialize input variable X
out0_grad = scope.var("out0@GRAD").get_selected_rows()
Expand All @@ -113,7 +115,6 @@ def check_grad_with_place(self, place):
"sum",
X=["out0@GRAD", "out1@GRAD"],
Out="X@GRAD",
rows_sections=rows_sections,
height_sections=height_sections)

grad_op.run(scope, place)
Expand Down

0 comments on commit 1861562

Please sign in to comment.