From bc6edc58743b4ebdb36ec262cb8f3164afdf9642 Mon Sep 17 00:00:00 2001 From: Siddharth Goyal Date: Wed, 9 May 2018 19:07:58 +0000 Subject: [PATCH 01/11] Add recommendation system implementation with new API --- .../no_test_recommender_system.py | 298 ++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py diff --git a/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py b/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py new file mode 100644 index 00000000000000..3f686b1fd73c56 --- /dev/null +++ b/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py @@ -0,0 +1,298 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import sys +import os +import numpy as np +import paddle +import paddle.fluid as fluid +import paddle.fluid.framework as framework +import paddle.fluid.layers as layers +import paddle.fluid.nets as nets +from functools import partial + +IS_SPARSE = True +USE_GPU = False +BATCH_SIZE = 256 +FEEDING_MAP = { + 'user_id': 0, + 'gender_id': 1, + 'age_id': 2, + 'job_id': 3, + 'movie_id': 4, + 'category_id': 5, + 'movie_title': 6, + 'score': 7 +} + + +def get_usr_combined_features(): + # FIXME(dzh) : old API integer_value(10) may have range check. + # currently we don't have user configurated check. + + USR_DICT_SIZE = paddle.dataset.movielens.max_user_id() + 1 + + uid = layers.data(name='user_id', shape=[1], dtype='int64') + + usr_emb = layers.embedding( + input=uid, + dtype='float32', + size=[USR_DICT_SIZE, 32], + param_attr='user_table', + is_sparse=IS_SPARSE) + + usr_fc = layers.fc(input=usr_emb, size=32) + + USR_GENDER_DICT_SIZE = 2 + + usr_gender_id = layers.data(name='gender_id', shape=[1], dtype='int64') + + usr_gender_emb = layers.embedding( + input=usr_gender_id, + size=[USR_GENDER_DICT_SIZE, 16], + param_attr='gender_table', + is_sparse=IS_SPARSE) + + usr_gender_fc = layers.fc(input=usr_gender_emb, size=16) + + USR_AGE_DICT_SIZE = len(paddle.dataset.movielens.age_table) + usr_age_id = layers.data(name='age_id', shape=[1], dtype="int64") + + usr_age_emb = layers.embedding( + input=usr_age_id, + size=[USR_AGE_DICT_SIZE, 16], + is_sparse=IS_SPARSE, + param_attr='age_table') + + usr_age_fc = layers.fc(input=usr_age_emb, size=16) + + USR_JOB_DICT_SIZE = paddle.dataset.movielens.max_job_id() + 1 + usr_job_id = layers.data(name='job_id', shape=[1], dtype="int64") + + usr_job_emb = layers.embedding( + input=usr_job_id, + size=[USR_JOB_DICT_SIZE, 16], + param_attr='job_table', + is_sparse=IS_SPARSE) + + usr_job_fc = layers.fc(input=usr_job_emb, size=16) + + concat_embed = layers.concat( + input=[usr_fc, usr_gender_fc, usr_age_fc, usr_job_fc], axis=1) + + usr_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") + + return usr_combined_features + + +def get_mov_combined_features(): + + MOV_DICT_SIZE = paddle.dataset.movielens.max_movie_id() + 1 + + mov_id = layers.data(name='movie_id', shape=[1], dtype='int64') + + mov_emb = layers.embedding( + input=mov_id, + dtype='float32', + size=[MOV_DICT_SIZE, 32], + param_attr='movie_table', + is_sparse=IS_SPARSE) + + mov_fc = layers.fc(input=mov_emb, size=32) + + CATEGORY_DICT_SIZE = len(paddle.dataset.movielens.movie_categories()) + + category_id = layers.data( + name='category_id', shape=[1], dtype='int64', lod_level=1) + + mov_categories_emb = layers.embedding( + input=category_id, size=[CATEGORY_DICT_SIZE, 32], is_sparse=IS_SPARSE) + + mov_categories_hidden = layers.sequence_pool( + input=mov_categories_emb, pool_type="sum") + + MOV_TITLE_DICT_SIZE = len(paddle.dataset.movielens.get_movie_title_dict()) + + mov_title_id = layers.data( + name='movie_title', shape=[1], dtype='int64', lod_level=1) + + mov_title_emb = layers.embedding( + input=mov_title_id, size=[MOV_TITLE_DICT_SIZE, 32], is_sparse=IS_SPARSE) + + mov_title_conv = nets.sequence_conv_pool( + input=mov_title_emb, + num_filters=32, + filter_size=3, + act="tanh", + pool_type="sum") + + concat_embed = layers.concat( + input=[mov_fc, mov_categories_hidden, mov_title_conv], axis=1) + + # FIXME(dzh) : need tanh operator + mov_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") + + return mov_combined_features + + +def train_network(): + usr_combined_features = get_usr_combined_features() + mov_combined_features = get_mov_combined_features() + + inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features) + scale_infer = layers.scale(x=inference, scale=5.0) + + label = layers.data(name='score', shape=[1], dtype='float32') + square_cost = layers.square_error_cost(input=scale_infer, label=label) + avg_cost = layers.mean(square_cost) + + return avg_cost, scale_infer + + +def inference_network(): + usr_combined_features = get_usr_combined_features() + mov_combined_features = get_mov_combined_features() + + inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features) + scale_infer = layers.scale(x=inference, scale=5.0) + + return scale_infer + + +def func_feed(feeding, data): + feed_tensors = {} + for (key, idx) in feeding.iteritems(): + tensor = fluid.LoDTensor() + if key != "category_id" and key != "movie_title": + if key == "score": + numpy_data = np.array(map(lambda x: x[idx], data)).astype( + "float32") + else: + numpy_data = np.array(map(lambda x: x[idx], data)).astype( + "int64") + else: + numpy_data = map(lambda x: np.array(x[idx]).astype("int64"), data) + lod_info = [len(item) for item in numpy_data] + offset = 0 + lod = [offset] + for item in lod_info: + offset += item + lod.append(offset) + numpy_data = np.concatenate(numpy_data, axis=0) + tensor.set_lod([lod]) + + numpy_data = numpy_data.reshape([numpy_data.shape[0], 1]) + tensor.set(numpy_data, place) + feed_tensors[key] = tensor + return feed_tensors + + +def train(use_cuda, save_path): + EPOCH_NUM = 1 + + feeding_map = { + 'user_id': 0, + 'gender_id': 1, + 'age_id': 2, + 'job_id': 3, + 'movie_id': 4, + 'category_id': 5, + 'movie_title': 6, + 'score': 7 + } + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.movielens.train(), buf_size=8192), + batch_size=BATCH_SIZE) + test_reader = paddle.batch( + paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) + + def event_handler(event): + if isinstance(event, fluid.EndIteration): + if (event.batch_id % 10) == 0: + avg_cost = trainer.test(reader=test_reader) + + print('BatchID {0:04}, Loss {1:2.2}'.format(event.batch_id + 1, + avg_cost)) + + if avg_cost > 0.01: # Low threshold for speeding up CI + trainer.save_params(save_path) + return + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.2) + trainer = fluid.Trainer(train_network, optimizer=sgd_optimizer, place=place) + trainer.train( + train_reader, + EPOCH_NUM, + event_handler=event_handler, + data_feed_handler=partial(func_feed, feeding_map)) + + +def infer(use_cuda, save_path): + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + inferencer = fluid.Inferencer( + inference_network, param_path=save_path, place=place) + + def create_lod_tensor(data, lod=None): + tensor = fluid.LoDTensor() + if lod is None: + # Tensor, the shape is [batch_size, 1] + index = 0 + lod_0 = [index] + for l in range(len(data)): + index += 1 + lod_0.append(index) + lod = [lod_0] + tensor.set_lod(lod) + + flattened_data = np.concatenate(data, axis=0).astype("int64") + flattened_data = flattened_data.reshape([len(flattened_data), 1]) + tensor.set(flattened_data, place) + return tensor + + # Generate a random input for inference + user_id = create_lod_tensor([[1]]) + gender_id = create_lod_tensor([[1]]) + age_id = create_lod_tensor([[0]]) + job_id = create_lod_tensor([[10]]) + movie_id = create_lod_tensor([[783]]) + category_id = create_lod_tensor([[10], [8], [9]], [[0, 3]]) + movie_title = create_lod_tensor([[1069], [4140], [2923], [710], [988]], + [[0, 5]]) + + results = inferencer.infer({ + 'user_id': user_id, + 'gender_id': gender_id, + 'age_id': age_id, + 'job_id': job_id, + 'movie_id': movie_id, + 'category_id': category_id, + 'movie_title': movie_title + }) + + print("infer results: ", results) + + +def main(use_cuda): + if use_cuda and not fluid.core.is_compiled_with_cuda(): + return + save_path = "recommender_system.inference.model" + train(use_cuda, save_path) + infer(use_cuda, save_path) + + +if __name__ == '__main__': + main(USE_CUDA) From fce6034e4e5abb0b6fb48f7c7a7eed929eac453a Mon Sep 17 00:00:00 2001 From: Siddharth Goyal Date: Thu, 10 May 2018 19:01:31 +0000 Subject: [PATCH 02/11] Address review comments --- .../no_test_recommender_system.py | 39 +++++++------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py b/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py index 3f686b1fd73c56..bcd5594683c078 100644 --- a/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py +++ b/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py @@ -26,16 +26,6 @@ IS_SPARSE = True USE_GPU = False BATCH_SIZE = 256 -FEEDING_MAP = { - 'user_id': 0, - 'gender_id': 1, - 'age_id': 2, - 'job_id': 3, - 'movie_id': 4, - 'category_id': 5, - 'movie_title': 6, - 'score': 7 -} def get_usr_combined_features(): @@ -147,28 +137,25 @@ def get_mov_combined_features(): return mov_combined_features -def train_network(): +def inference_program(): usr_combined_features = get_usr_combined_features() mov_combined_features = get_mov_combined_features() inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features) scale_infer = layers.scale(x=inference, scale=5.0) - label = layers.data(name='score', shape=[1], dtype='float32') - square_cost = layers.square_error_cost(input=scale_infer, label=label) - avg_cost = layers.mean(square_cost) + return scale_infer - return avg_cost, scale_infer +def train_program(): -def inference_network(): - usr_combined_features = get_usr_combined_features() - mov_combined_features = get_mov_combined_features() + scale_infer = inference_program() - inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features) - scale_infer = layers.scale(x=inference, scale=5.0) + label = layers.data(name='score', shape=[1], dtype='float32') + square_cost = layers.square_error_cost(input=scale_infer, label=label) + avg_cost = layers.mean(square_cost) - return scale_infer + return avg_cost, scale_infer def func_feed(feeding, data): @@ -220,11 +207,11 @@ def train(use_cuda, save_path): paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) def event_handler(event): - if isinstance(event, fluid.EndIteration): - if (event.batch_id % 10) == 0: + if isinstance(event, fluid.EndEpochEvent): + if (event.epoch % 10) == 0: avg_cost = trainer.test(reader=test_reader) - print('BatchID {0:04}, Loss {1:2.2}'.format(event.batch_id + 1, + print('BatchID {0:04}, Loss {1:2.2}'.format(event.epoch + 1, avg_cost)) if avg_cost > 0.01: # Low threshold for speeding up CI @@ -233,7 +220,7 @@ def event_handler(event): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.2) - trainer = fluid.Trainer(train_network, optimizer=sgd_optimizer, place=place) + trainer = fluid.Trainer(train_program, optimizer=sgd_optimizer, place=place) trainer.train( train_reader, EPOCH_NUM, @@ -244,7 +231,7 @@ def event_handler(event): def infer(use_cuda, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() inferencer = fluid.Inferencer( - inference_network, param_path=save_path, place=place) + inference_program, param_path=save_path, place=place) def create_lod_tensor(data, lod=None): tensor = fluid.LoDTensor() From eec0b184cf53692ca5f4d94bbc8c303f31cba94d Mon Sep 17 00:00:00 2001 From: Siddharth Goyal Date: Wed, 16 May 2018 21:26:37 +0000 Subject: [PATCH 03/11] Modify as per new API --- .../no_test_recommender_system.py | 69 ++++++++++++------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py b/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py index bcd5594683c078..627e5c2ea78ce9 100644 --- a/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py +++ b/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py @@ -155,7 +155,7 @@ def train_program(): square_cost = layers.square_error_cost(input=scale_infer, label=label) avg_cost = layers.mean(square_cost) - return avg_cost, scale_infer + return [avg_cost, scale_infer] def func_feed(feeding, data): @@ -186,9 +186,9 @@ def func_feed(feeding, data): return feed_tensors -def train(use_cuda, save_path): - EPOCH_NUM = 1 - +def train(use_cuda, train_program, save_path): + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + optimizer = fluid.optimizer.SGD(learning_rate=0.2) feeding_map = { 'user_id': 0, 'gender_id': 1, @@ -199,36 +199,52 @@ def train(use_cuda, save_path): 'movie_title': 6, 'score': 7 } - train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.movielens.train(), buf_size=8192), - batch_size=BATCH_SIZE) - test_reader = paddle.batch( - paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) + + trainer = fluid.Trainer( + train_func=train_program, place=place, optimizer=optimizer) + + feed_order = [ + 'user_id', 'gender_id', 'age_id', 'job_id', 'movie_id', 'category_id', + 'movie_title', 'score' + ] def event_handler(event): if isinstance(event, fluid.EndEpochEvent): - if (event.epoch % 10) == 0: - avg_cost = trainer.test(reader=test_reader) + test_reader = paddle.batch( + paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) + avg_cost_set = trainer.test( + reader=test_reader, feed_order=feed_order) - print('BatchID {0:04}, Loss {1:2.2}'.format(event.epoch + 1, - avg_cost)) + # get avg cost + avg_cost = numpy.array(avg_cost_set).mean() - if avg_cost > 0.01: # Low threshold for speeding up CI - trainer.save_params(save_path) - return + print("avg_cost: %s" % avg_cost) + + if float(avg_cost) < 6.0: # Smaller value to increase CI speed + trainer.save_params(save_dirname) + else: + print('BatchID {0}, Test Loss {1:0.2}'.format(event.epoch + 1, + float(avg_cost))) + if math.isnan(float(avg_cost)): + sys.exit("got NaN loss, training failed.") + + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.movielens.train(), buf_size=8192), + batch_size=BATCH_SIZE) - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.2) - trainer = fluid.Trainer(train_program, optimizer=sgd_optimizer, place=place) trainer.train( - train_reader, - EPOCH_NUM, + num_epochs=1, event_handler=event_handler, + reader=train_reader, + feed_order=[ + 'user_id', 'gender_id', 'age_id', 'job_id', 'movie_id', + 'category_id', 'movie_title', 'score' + ], data_feed_handler=partial(func_feed, feeding_map)) -def infer(use_cuda, save_path): +def infer(use_cuda, inference_program, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() inferencer = fluid.Inferencer( inference_program, param_path=save_path, place=place) @@ -277,8 +293,11 @@ def main(use_cuda): if use_cuda and not fluid.core.is_compiled_with_cuda(): return save_path = "recommender_system.inference.model" - train(use_cuda, save_path) - infer(use_cuda, save_path) + train(use_cuda=use_cuda, train_program=train_program, save_path=save_path) + infer( + use_cuda=use_cuda, + inference_program=inference_program, + save_path=save_path) if __name__ == '__main__': From a14423abbc5f9fabbc8158d274e5b4b627014290 Mon Sep 17 00:00:00 2001 From: Siddharth Goyal Date: Thu, 17 May 2018 01:16:01 +0000 Subject: [PATCH 04/11] Modify train and test functions to enable data_feed_handler --- .../tests/book/high-level-api/CMakeLists.txt | 1 + .../recommender_system/CMakeLists.txt | 7 +++ .../test_recommender_system.py} | 38 +++++++++------- python/paddle/fluid/trainer.py | 44 +++++++++++++------ 4 files changed, 59 insertions(+), 31 deletions(-) create mode 100644 python/paddle/fluid/tests/book/high-level-api/recommender_system/CMakeLists.txt rename python/paddle/fluid/tests/book/{recommender_system/no_test_recommender_system.py => high-level-api/recommender_system/test_recommender_system.py} (91%) diff --git a/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt b/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt index c2a15bdb3b17b6..c2127cafba6a5f 100644 --- a/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt +++ b/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt @@ -8,3 +8,4 @@ endforeach() add_subdirectory(fit_a_line) add_subdirectory(recognize_digits) +add_subdirectory(recommender_system) diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/CMakeLists.txt b/python/paddle/fluid/tests/book/high-level-api/recommender_system/CMakeLists.txt new file mode 100644 index 00000000000000..673c965b662a02 --- /dev/null +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/CMakeLists.txt @@ -0,0 +1,7 @@ +file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") +string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") + +# default test +foreach(src ${TEST_OPS}) + py_test(${src} SRCS ${src}.py) +endforeach() diff --git a/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system.py similarity index 91% rename from python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py rename to python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system.py index 627e5c2ea78ce9..5d809937cfba66 100644 --- a/python/paddle/fluid/tests/book/recommender_system/no_test_recommender_system.py +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system.py @@ -158,7 +158,7 @@ def train_program(): return [avg_cost, scale_infer] -def func_feed(feeding, data): +def func_feed(feeding, place, data): feed_tensors = {} for (key, idx) in feeding.iteritems(): tensor = fluid.LoDTensor() @@ -213,15 +213,17 @@ def event_handler(event): test_reader = paddle.batch( paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) avg_cost_set = trainer.test( - reader=test_reader, feed_order=feed_order) + reader=test_reader, + feed_order=feed_order, + data_feed_handler=partial(func_feed, feeding_map, place)) # get avg cost - avg_cost = numpy.array(avg_cost_set).mean() + avg_cost = np.array(avg_cost_set).mean() print("avg_cost: %s" % avg_cost) - if float(avg_cost) < 6.0: # Smaller value to increase CI speed - trainer.save_params(save_dirname) + if float(avg_cost) < 3: # Smaller value to increase CI speed + trainer.save_params(save_path) else: print('BatchID {0}, Test Loss {1:0.2}'.format(event.epoch + 1, float(avg_cost))) @@ -241,7 +243,7 @@ def event_handler(event): 'user_id', 'gender_id', 'age_id', 'job_id', 'movie_id', 'category_id', 'movie_title', 'score' ], - data_feed_handler=partial(func_feed, feeding_map)) + data_feed_handler=partial(func_feed, feeding_map, place)) def infer(use_cuda, inference_program, save_path): @@ -276,17 +278,19 @@ def create_lod_tensor(data, lod=None): movie_title = create_lod_tensor([[1069], [4140], [2923], [710], [988]], [[0, 5]]) - results = inferencer.infer({ - 'user_id': user_id, - 'gender_id': gender_id, - 'age_id': age_id, - 'job_id': job_id, - 'movie_id': movie_id, - 'category_id': category_id, - 'movie_title': movie_title - }) + results = inferencer.infer( + { + 'user_id': user_id, + 'gender_id': gender_id, + 'age_id': age_id, + 'job_id': job_id, + 'movie_id': movie_id, + 'category_id': category_id, + 'movie_title': movie_title + }, + return_numpy=False) - print("infer results: ", results) + print("infer results: ", np.array(results[0])) def main(use_cuda): @@ -301,4 +305,4 @@ def main(use_cuda): if __name__ == '__main__': - main(USE_CUDA) + main(USE_GPU) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index c24662ac2114c2..b18a57a77423cf 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -174,7 +174,8 @@ def train(self, event_handler, reader, feed_order, - parallel=False): + parallel=False, + data_feed_handler=None): """ Train the model. @@ -200,9 +201,10 @@ def train(self, exe.run() return - self._train_by_executor(num_epochs, event_handler, reader, feed_order) + self._train_by_executor(num_epochs, event_handler, reader, feed_order, + data_feed_handler) - def test(self, reader, feed_order): + def test(self, reader, feed_order, data_feed_handler=None): """ Test the model on given test data @@ -212,7 +214,8 @@ def test(self, reader, feed_order): order in program """ - return self._test_by_executor(reader, feed_order, self.test_outputs) + return self._test_by_executor(reader, feed_order, self.test_outputs, + data_feed_handler) def save_params(self, param_path): # reference: save_persistables in io.py @@ -228,7 +231,8 @@ def _prog_and_scope_guard(self): with executor.scope_guard(self.scope): yield - def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): + def _train_by_executor(self, num_epochs, event_handler, reader, feed_order, + data_feed_handler): """ Train by Executor and single device. @@ -243,29 +247,41 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): """ with self._prog_and_scope_guard(): feed_var_list = build_feed_var_list(self.train_program, feed_order) - feeder = data_feeder.DataFeeder( - feed_list=feed_var_list, place=self.place) + if data_feed_handler == None: + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) for epoch_id in range(num_epochs): event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): event_handler(BeginStepEvent(epoch_id, step_id)) - exe.run(feed=feeder.feed(data), fetch_list=[]) + if data_feed_handler: + exe.run(feed=data_feed_handler(data), fetch_list=[]) + else: + exe.run(feed=feeder.feed(data), fetch_list=[]) + event_handler(EndStepEvent(epoch_id, step_id)) event_handler(EndEpochEvent(epoch_id)) - def _test_by_executor(self, reader, feed_order, fetch_list): + def _test_by_executor(self, reader, feed_order, fetch_list, + data_feed_handler): with executor.scope_guard(self.scope): feed_var_list = build_feed_var_list(self.test_program, feed_order) - feeder = data_feeder.DataFeeder( - feed_list=feed_var_list, place=self.place) + if data_feed_handler == None: + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) accumulated = len(fetch_list) * [0] count = 0 for data in reader(): - outs = exe.run(program=self.test_program, - feed=feeder.feed(data), - fetch_list=fetch_list) + if data_feed_handler: + outs = exe.run(program=self.test_program, + feed=data_feed_handler(data), + fetch_list=fetch_list) + else: + outs = exe.run(program=self.test_program, + feed=feeder.feed(data), + fetch_list=fetch_list) accumulated = [x[0] + x[1][0] for x in zip(accumulated, outs)] count += 1 From cd788c6a6e2a45dc6a8cc4ae9ceb58b48c80949d Mon Sep 17 00:00:00 2001 From: Siddharth Goyal Date: Thu, 17 May 2018 03:05:33 +0000 Subject: [PATCH 05/11] Rename script to avoid same names for Cmake --- ...st_recommender_system.py => test_recommender_system_newapi.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename python/paddle/fluid/tests/book/high-level-api/recommender_system/{test_recommender_system.py => test_recommender_system_newapi.py} (100%) diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py similarity index 100% rename from python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system.py rename to python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py From cca4a55fdd2f0bc705584cdb7a6c1df0c973a957 Mon Sep 17 00:00:00 2001 From: Siddharth Goyal Date: Fri, 18 May 2018 07:00:20 +0000 Subject: [PATCH 06/11] Fix issues with data_feed_handler --- python/paddle/fluid/trainer.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index e029bab3241372..61bc4735e09e13 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -297,14 +297,19 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order, """ with self._prog_and_scope_guard(): feed_var_list = build_feed_var_list(self.train_program, feed_order) - if data_feed_handler == None: - feeder = data_feeder.DataFeeder( - feed_list=feed_var_list, place=self.place) + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) reader = feeder.decorate_reader(reader, multi_devices=False) - self._train_by_any_executor(event_handler, exe, num_epochs, reader) - - def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): + self._train_by_any_executor(event_handler, exe, num_epochs, reader, + data_feed_handler) + + def _train_by_any_executor(self, + event_handler, + exe, + num_epochs, + reader, + data_feed_handler=None): for epoch_id in range(num_epochs): event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): From 5964c5e44f669177bc22a45c63b54639f7fc1562 Mon Sep 17 00:00:00 2001 From: daming-lu Date: Wed, 23 May 2018 16:35:55 -0700 Subject: [PATCH 07/11] change trainer back, rm data_feed_handler --- .../test_recommender_system_newapi.py | 6 +-- python/paddle/fluid/trainer.py | 51 ++++++------------- 2 files changed, 17 insertions(+), 40 deletions(-) diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py index 5d809937cfba66..404fc4de8ea026 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py @@ -214,8 +214,7 @@ def event_handler(event): paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) avg_cost_set = trainer.test( reader=test_reader, - feed_order=feed_order, - data_feed_handler=partial(func_feed, feeding_map, place)) + feed_order=feed_order) # get avg cost avg_cost = np.array(avg_cost_set).mean() @@ -242,8 +241,7 @@ def event_handler(event): feed_order=[ 'user_id', 'gender_id', 'age_id', 'job_id', 'movie_id', 'category_id', 'movie_title', 'score' - ], - data_feed_handler=partial(func_feed, feeding_map, place)) + ]) def infer(use_cuda, inference_program, save_path): diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 61bc4735e09e13..7da123dd92ed9d 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -217,13 +217,7 @@ def stop(self): """ self.__stop = True - def train(self, - num_epochs, - event_handler, - reader, - feed_order, - parallel=False, - data_feed_handler=None): + def train(self, num_epochs, event_handler, reader=None, feed_order=None): """ Train the model. @@ -233,8 +227,6 @@ def train(self, reader: feed_order: Feeding order of reader. None will following the defining order in program - parallel: - data_feed_handler: Function to be called when data is fed. Returns: @@ -246,14 +238,13 @@ def train(self, exe.run() return if self.parallel: - #TODO(sidgoyal78) add `data_feed_handler` to parallel executor self._train_by_parallel_executor(num_epochs, event_handler, reader, feed_order) else: self._train_by_executor(num_epochs, event_handler, reader, - feed_order, data_feed_handler) + feed_order) - def test(self, reader, feed_order, data_feed_handler=None): + def test(self, reader, feed_order): """ Test the model on given test data @@ -261,11 +252,10 @@ def test(self, reader, feed_order, data_feed_handler=None): reader: The reader that yields test data. feed_order: Feeding order of reader. None will following the defining order in program - data_feed_handler: Function that will be called when data is fed """ - return self._test_by_executor( - reader, feed_order, self.train_func_outputs, data_feed_handler) + return self._test_by_executor(reader, feed_order, + self.train_func_outputs) def save_params(self, param_path): # reference: save_persistables in io.py @@ -281,8 +271,7 @@ def _prog_and_scope_guard(self): with executor.scope_guard(self.scope): yield - def _train_by_executor(self, num_epochs, event_handler, reader, feed_order, - data_feed_handler): + def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): """ Train by Executor and single device. @@ -291,7 +280,7 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order, event_handler: reader: feed_order: - data_feed_handler: + Returns: """ @@ -301,15 +290,9 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order, feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) reader = feeder.decorate_reader(reader, multi_devices=False) - self._train_by_any_executor(event_handler, exe, num_epochs, reader, - data_feed_handler) - - def _train_by_any_executor(self, - event_handler, - exe, - num_epochs, - reader, - data_feed_handler=None): + self._train_by_any_executor(event_handler, exe, num_epochs, reader) + + def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): for epoch_id in range(num_epochs): event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): @@ -318,8 +301,7 @@ def _train_by_any_executor(self, begin_event = BeginStepEvent(epoch_id, step_id) event_handler(begin_event) if begin_event.fetch_metrics: - metrics = exe.run(feed=data_feed_handler(data) - if data_feed_handler else data, + metrics = exe.run(feed=data, fetch_list=[ var.name for var in self.train_func_outputs @@ -329,20 +311,17 @@ def _train_by_any_executor(self, event_handler(EndStepEvent(epoch_id, step_id, metrics)) event_handler(EndEpochEvent(epoch_id)) - def _test_by_executor(self, reader, feed_order, fetch_list, - data_feed_handler): + def _test_by_executor(self, reader, feed_order, fetch_list): with executor.scope_guard(self.scope): feed_var_list = build_feed_var_list(self.test_program, feed_order) - if data_feed_handler == None: - feeder = data_feeder.DataFeeder( - feed_list=feed_var_list, place=self.place) + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) accumulated = len(fetch_list) * [0] count = 0 for data in reader(): outs = exe.run(program=self.test_program, - feed=data_feed_handler(data) - if data_feed_handler else feeder.feed(data), + feed=feeder.feed(data), fetch_list=fetch_list) accumulated = [x[0] + x[1][0] for x in zip(accumulated, outs)] count += 1 From 0dcf50d0827444deda1b7963932e550c141b421f Mon Sep 17 00:00:00 2001 From: daming-lu Date: Thu, 24 May 2018 01:17:32 +0000 Subject: [PATCH 08/11] 3->4 --- .../recommender_system/test_recommender_system_newapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py index 404fc4de8ea026..5b21633ba4ce8d 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py @@ -221,7 +221,7 @@ def event_handler(event): print("avg_cost: %s" % avg_cost) - if float(avg_cost) < 3: # Smaller value to increase CI speed + if float(avg_cost) < 4: # Smaller value to increase CI speed trainer.save_params(save_path) else: print('BatchID {0}, Test Loss {1:0.2}'.format(event.epoch + 1, From 2789a532bba3fbd7dbd3489404bc8ce84603c0d6 Mon Sep 17 00:00:00 2001 From: daming-lu Date: Wed, 23 May 2018 18:20:10 -0700 Subject: [PATCH 09/11] rm func_feed --- .../test_recommender_system_newapi.py | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py index 5b21633ba4ce8d..aaa17f19b0612d 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py @@ -158,34 +158,6 @@ def train_program(): return [avg_cost, scale_infer] -def func_feed(feeding, place, data): - feed_tensors = {} - for (key, idx) in feeding.iteritems(): - tensor = fluid.LoDTensor() - if key != "category_id" and key != "movie_title": - if key == "score": - numpy_data = np.array(map(lambda x: x[idx], data)).astype( - "float32") - else: - numpy_data = np.array(map(lambda x: x[idx], data)).astype( - "int64") - else: - numpy_data = map(lambda x: np.array(x[idx]).astype("int64"), data) - lod_info = [len(item) for item in numpy_data] - offset = 0 - lod = [offset] - for item in lod_info: - offset += item - lod.append(offset) - numpy_data = np.concatenate(numpy_data, axis=0) - tensor.set_lod([lod]) - - numpy_data = numpy_data.reshape([numpy_data.shape[0], 1]) - tensor.set(numpy_data, place) - feed_tensors[key] = tensor - return feed_tensors - - def train(use_cuda, train_program, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() optimizer = fluid.optimizer.SGD(learning_rate=0.2) From 8062588e8e9c41e666f18ec7fba45d6699b92522 Mon Sep 17 00:00:00 2001 From: daming-lu Date: Wed, 23 May 2018 18:23:31 -0700 Subject: [PATCH 10/11] epoch to step --- .../test_recommender_system_newapi.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py index aaa17f19b0612d..66d6fe66a7da4b 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py @@ -14,14 +14,12 @@ import math import sys -import os import numpy as np import paddle import paddle.fluid as fluid -import paddle.fluid.framework as framework import paddle.fluid.layers as layers import paddle.fluid.nets as nets -from functools import partial + IS_SPARSE = True USE_GPU = False @@ -161,16 +159,6 @@ def train_program(): def train(use_cuda, train_program, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() optimizer = fluid.optimizer.SGD(learning_rate=0.2) - feeding_map = { - 'user_id': 0, - 'gender_id': 1, - 'age_id': 2, - 'job_id': 3, - 'movie_id': 4, - 'category_id': 5, - 'movie_title': 6, - 'score': 7 - } trainer = fluid.Trainer( train_func=train_program, place=place, optimizer=optimizer) @@ -181,7 +169,7 @@ def train(use_cuda, train_program, save_path): ] def event_handler(event): - if isinstance(event, fluid.EndEpochEvent): + if isinstance(event, fluid.EndStepEvent): test_reader = paddle.batch( paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) avg_cost_set = trainer.test( @@ -195,6 +183,7 @@ def event_handler(event): if float(avg_cost) < 4: # Smaller value to increase CI speed trainer.save_params(save_path) + trainer.stop() else: print('BatchID {0}, Test Loss {1:0.2}'.format(event.epoch + 1, float(avg_cost))) From 0f10f30d56291d9bc9658ce44f1c8bfde044851d Mon Sep 17 00:00:00 2001 From: daming-lu Date: Thu, 24 May 2018 01:26:43 +0000 Subject: [PATCH 11/11] style --- python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt | 1 - .../recommender_system/test_recommender_system_newapi.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt b/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt index 41e2fb4bcf4462..b5cd5706a78871 100644 --- a/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt +++ b/python/paddle/fluid/tests/book/high-level-api/CMakeLists.txt @@ -11,4 +11,3 @@ add_subdirectory(recognize_digits) add_subdirectory(image_classification) add_subdirectory(understand_sentiment) add_subdirectory(recommender_system) - diff --git a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py index 66d6fe66a7da4b..259680cb097a12 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py +++ b/python/paddle/fluid/tests/book/high-level-api/recommender_system/test_recommender_system_newapi.py @@ -20,7 +20,6 @@ import paddle.fluid.layers as layers import paddle.fluid.nets as nets - IS_SPARSE = True USE_GPU = False BATCH_SIZE = 256 @@ -173,8 +172,7 @@ def event_handler(event): test_reader = paddle.batch( paddle.dataset.movielens.test(), batch_size=BATCH_SIZE) avg_cost_set = trainer.test( - reader=test_reader, - feed_order=feed_order) + reader=test_reader, feed_order=feed_order) # get avg cost avg_cost = np.array(avg_cost_set).mean()