Skip to content

Commit

Permalink
add ctr table depends (#36465)
Browse files Browse the repository at this point in the history
* add ctr table depends

* code style

* fix

* fix

* fix naming

* rename

* rename
  • Loading branch information
zhaocaibei123 authored Oct 21, 2021
1 parent 7253398 commit d64f7b3
Show file tree
Hide file tree
Showing 10 changed files with 937 additions and 3 deletions.
65 changes: 65 additions & 0 deletions paddle/fluid/distributed/common/local_random.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <assert.h>
#include <time.h>
#include <atomic>
#include <random>

namespace paddle {
namespace distributed {

// Get time in seconds.
inline double current_realtime() {
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
return tp.tv_sec + tp.tv_nsec * 1e-9;
}

inline std::default_random_engine& local_random_engine() {
struct engine_wrapper_t {
std::default_random_engine engine;
engine_wrapper_t() {
static std::atomic<unsigned long> x(0); // NOLINT
std::seed_seq sseq = {
x++, x++, x++, (unsigned long)(current_realtime() * 1000)}; // NOLINT
engine.seed(sseq);
}
};
thread_local engine_wrapper_t r;
return r.engine;
}

template <class T = double>
std::uniform_real_distribution<T>& local_uniform_real_distribution() {
thread_local std::uniform_real_distribution<T> distr;
assert(distr.a() == 0.0 && distr.b() == 1.0);
return distr;
}

template <class T = double>
T uniform_real() {
return local_uniform_real_distribution<T>()(local_random_engine());
}

template <class T = double>
T uniform_real(T a, T b) {
if (a == b) {
return a;
}
return (T)(a + uniform_real<T>() * (b - a));
}
} // namespace distributed
} // namespace paddle
68 changes: 68 additions & 0 deletions paddle/fluid/distributed/ps.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,41 @@ message TableParameter {

message TableAccessorParameter {
optional string accessor_class = 1;
// optional SparseSGDRuleParameter sparse_sgd_param = 2;
optional uint32 fea_dim = 4 [ default = 11 ];
optional uint32 embedx_dim = 5 [ default = 8 ];
optional uint32 embedx_threshold = 6 [ default = 10 ];
optional CtrAccessorParameter ctr_accessor_param = 7;
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
// optional SparseCommonSGDRuleParameter sparse_commonsgd_param = 9;
optional SparseCommonSGDRuleParameter embed_sgd_param = 10;
optional SparseCommonSGDRuleParameter embedx_sgd_param = 11;
}

message CtrAccessorParameter {
optional float nonclk_coeff = 1
[ default = 0.1 ]; // to calculate show_click_score
optional float click_coeff = 2
[ default = 1 ]; // to calculate show_click_score
optional float base_threshold = 3 [
default = 1.5
]; // show_click_score > base_threshold, this feature can be saved
optional float delta_threshold = 4
[ default =
0.25 ]; // delta_score > delta_threshold, this feature can be saved
optional float delta_keep_days = 5
[ default =
16 ]; // unseen_day < delta_keep_days, this feature can be saved
optional float show_click_decay_rate = 6 [
default = 0.98
]; // show/click will update to show/click * show_click_decay_rate after a day
optional float delete_threshold = 7
[ default = 0.8 ]; // threshold to shrink a feasign
optional float delete_after_unseen_days = 8
[ default = 30 ]; // unseen_day > delete_after_unseen_days, this feature
// will be delete in shrink_model
optional int32 ssd_unseenday_threshold = 9
[ default = 1 ]; // threshold to save ssd
}

message TensorAccessorParameter {
Expand Down Expand Up @@ -150,3 +181,40 @@ message TableAccessorSaveParameter {
optional string converter = 2;
optional string deconverter = 3;
}

// message SparseSGDRuleParameter {
// optional double learning_rate = 1 [default = 0.05];
// optional double initial_g2sum = 2 [default = 3.0];
// optional double initial_range = 3 [default = 0.0001];
// repeated float weight_bounds = 4;
//}

message SparseCommonSGDRuleParameter {
optional string name = 1;
optional SparseNaiveSGDRuleParameter naive = 2;
optional SparseAdagradSGDRuleParameter adagrad = 3;
optional SparseAdamSGDParameter adam = 4;
}

message SparseNaiveSGDRuleParameter { // SparseNaiveSGDRule
optional double learning_rate = 1 [ default = 0.05 ];
optional double initial_range = 2 [ default = 0.0001 ];
repeated float weight_bounds = 3;
}

message
SparseAdagradSGDRuleParameter { // SparseAdaGradSGDRule|StdAdaGradSGDRule
optional double learning_rate = 1 [ default = 0.05 ];
optional double initial_g2sum = 2 [ default = 3.0 ];
optional double initial_range = 3 [ default = 0.0001 ];
repeated float weight_bounds = 4;
}

message SparseAdamSGDParameter { // SparseAdamSGDRule
optional double learning_rate = 1 [ default = 0.001 ];
optional double initial_range = 2 [ default = 0.0001 ];
optional double beta1_decay_rate = 3 [ default = 0.9 ];
optional double beta2_decay_rate = 4 [ default = 0.999 ];
optional double ada_epsilon = 5 [ default = 1e-08 ];
repeated float weight_bounds = 6;
}
6 changes: 5 additions & 1 deletion paddle/fluid/distributed/table/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ cc_library(tensor_accessor SRCS tensor_accessor.cc DEPS ${TABLE_DEPS} eigen3 ps_
cc_library(tensor_table SRCS tensor_table.cc DEPS eigen3 ps_framework_proto executor scope device_context tensor ${TABLE_DEPS})
set_source_files_properties(table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})

cc_library(table SRCS table.cc DEPS common_table tensor_accessor tensor_table ps_framework_proto string_helper device_context gflags glog boost)
set_source_files_properties(sparse_sgd_rule.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(sparse_sgd_rule SRCS sparse_sgd_rule.cc DEPS ${TABLE_DEPS} ps_framework_proto)


cc_library(table SRCS table.cc DEPS common_table tensor_accessor tensor_table ps_framework_proto string_helper device_context gflags glog boost sparse_sgd_rule)
167 changes: 167 additions & 0 deletions paddle/fluid/distributed/table/depends/feature_value.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <ThreadPool.h>
#include <functional>
#include <future> // NOLINT
#include <memory>
#include <string>
#include <thread> // NOLINT
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "gflags/gflags.h"

#include "butil/object_pool.h"
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
#include "paddle/fluid/distributed/thirdparty/round_robin.h"
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/rw_lock.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace distributed {

static const int CTR_SPARSE_SHARD_BUCKET_NUM_BITS = 6;
static const size_t CTR_SPARSE_SHARD_BUCKET_NUM =
static_cast<size_t>(1) << CTR_SPARSE_SHARD_BUCKET_NUM_BITS;

class FixedFeatureValue {
public:
FixedFeatureValue() {}
~FixedFeatureValue() {}
float *data() { return data_.data(); }
size_t size() { return data_.size(); }
void resize(size_t size) { data_.resize(size); }
void shrink_to_fit() { data_.shrink_to_fit(); }

private:
std::vector<float> data_;
};

class SparseTableShard {
public:
typedef typename robin_hood::unordered_map<uint64_t, FixedFeatureValue *>
map_type;
SparseTableShard() {}
~SparseTableShard() {}

FixedFeatureValue *Init(const uint64_t &id) {
size_t hash = hasher_(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];

FixedFeatureValue *value = nullptr;
value = butil::get_object<FixedFeatureValue>();
table[id] = value;
return value;
}

// dont judge if (has(id))
float *Get(const uint64_t &id) {
size_t hash = hasher_(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];

// auto &value = table.at(id);
// return value->data_.data();
auto res = table.find(id);
FixedFeatureValue *value = res->second;
return value->data();
}

// for load, to reset count, unseen_days
FixedFeatureValue *GetValue(const uint64_t &id) {
size_t hash = hasher_(id);
size_t bucket = compute_bucket(hash);

auto &table = values_[bucket];
auto res = table.find(id);
return res->second;
}

void erase(uint64_t feasign) {
size_t hash = hasher_(feasign);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];

auto iter = table.find(feasign);
if (iter != table.end()) {
butil::return_object(iter->second);
iter = table.erase(iter);
}
}

void clear() {}

size_t compute_bucket(size_t hash) {
if (CTR_SPARSE_SHARD_BUCKET_NUM == 1) {
return 0;
} else {
return hash >> (sizeof(size_t) * 8 - CTR_SPARSE_SHARD_BUCKET_NUM_BITS);
}
}

map_type::iterator end() {
return values_[CTR_SPARSE_SHARD_BUCKET_NUM - 1].end();
}

map_type::iterator Find(uint64_t id) {
size_t hash = hasher_(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];

auto got = table.find(id);
if (got == table.end()) {
return end();
} else {
return got;
}
}

private:
bool Has(const uint64_t id) {
size_t hash = hasher_(id);
size_t bucket = compute_bucket(hash);
auto &table = values_[bucket];

auto got = table.find(id);
if (got == table.end()) {
return false;
} else {
return true;
}
}

public:
map_type values_[CTR_SPARSE_SHARD_BUCKET_NUM];
std::hash<uint64_t> hasher_;
};

} // namespace distributed
} // namespace paddle
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/table/depends/sparse_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ struct PullSparseValue {
feasigns_(nullptr),
frequencies_(nullptr) {}

explicit PullSparseValue(std::vector<uint64_t> feasigns,
std::vector<uint32_t> frequencies, int dim) {
explicit PullSparseValue(std::vector<uint64_t>& feasigns, // NOLINT
std::vector<uint32_t>& frequencies, // NOLINT
int dim) {
numel_ = feasigns.size();
dim_ = dim;
is_training_ = true;
Expand Down
Loading

0 comments on commit d64f7b3

Please sign in to comment.