Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an order switch to xmap_readers #2527

Merged
merged 25 commits into from
Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6122257
Allow printer layer to print user provided message
Jun 20, 2017
d9aac1e
add WITH_Go to disable compile go to paddle
jacquesqiao Jun 20, 2017
c9a76eb
modified xmap reader to process sample by order
wanghaoshuang Jun 20, 2017
8bc07de
format code
wanghaoshuang Jun 20, 2017
899035d
FIX: Add boost inc dir
gangliao Jun 20, 2017
4c4b689
disable go master
jacquesqiao Jun 20, 2017
b101aac
disable go master
jacquesqiao Jun 20, 2017
9e13b68
refine code
jacquesqiao Jun 20, 2017
03d181c
Merge pull request #2522 from emailweixu/print_layer
lcy-seso Jun 20, 2017
d051c2b
Merge pull request #2530 from jacquesqiao/fix-go
jacquesqiao Jun 20, 2017
64ea5a6
Merge pull request #2529 from gangliao/fix_bug
gangliao Jun 20, 2017
785a8d5
ENH: Merge multiple static libs into singe one
gangliao Jun 20, 2017
0b49395
FIX: clang-format
gangliao Jun 20, 2017
bb61adf
ENH: Typesetting CMake
gangliao Jun 20, 2017
936ac65
Fix unit test error: File exists: /root/.cache/paddle/dataset
Jun 20, 2017
6ef1ccd
Merge pull request #2536 from helinwang/2535
helinwang Jun 20, 2017
603fd43
Merge pull request #2533 from gangliao/compile
wangkuiyi Jun 21, 2017
a28ba1a
Rewrite tutorial comments in generic.cmake
wangkuiyi Jun 21, 2017
71f8c3b
Rearrange paragraphs
wangkuiyi Jun 21, 2017
252ef0c
Update
wangkuiyi Jun 21, 2017
7bce40d
Merge pull request #2538 from wangkuiyi/generic.cmake-comments
gangliao Jun 21, 2017
09cc440
modified xmap reader to process sample by order
wanghaoshuang Jun 20, 2017
cadea35
format code
wanghaoshuang Jun 20, 2017
d322c94
fix unittest
wanghaoshuang Jun 21, 2017
30eca3a
fix unittest of xmap
wanghaoshuang Jun 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions python/paddle/v2/reader/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class XmapEndSignal():
pass


def xmap_readers(mapper, reader, process_num, buffer_size):
def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
"""
Use multiprocess to map samples from reader by a mapper defined by user.
And this function contains a buffered decorator.
Expand All @@ -242,21 +242,33 @@ def xmap_readers(mapper, reader, process_num, buffer_size):
:type process_num: int
:param buffer_size: max buffer size
:type buffer_size: int
:param order: keep the order of reader
:type order: bool
:return: the decarated reader
:rtype: callable
"""
end = XmapEndSignal()
in_queue = Queue(buffer_size)
out_queue = Queue(buffer_size)
out_order = [0]

# define a worker to read samples from reader to in_queue
def read_worker(reader, in_queue):
for i in reader():
in_queue.put(i)
in_queue.put(end)

# define a worker to read samples from reader to in_queue with order flag
def order_read_worker(reader, in_queue):
in_order = 0
for i in reader():
in_queue.put((in_order, i))
in_order += 1
in_queue.put(end)

# start a read worker in a thread
t = Thread(target=read_worker, args=(reader, in_queue))
target = order_read_worker if order else read_worker
t = Thread(target=target, args=(reader, in_queue))
t.daemon = True
t.start()

Expand All @@ -271,11 +283,28 @@ def handle_worker(in_queue, out_queue, mapper):
in_queue.put(end)
out_queue.put(end)

# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue by order
def order_handle_worker(in_queue, out_queue, mapper, out_order):
ins = in_queue.get()
while not isinstance(ins, XmapEndSignal):
order, sample = ins
r = mapper(sample)
while order != out_order[0]:
pass
out_queue.put(r)
out_order[0] += 1
ins = in_queue.get()
in_queue.put(end)
out_queue.put(end)

# start several handle_workers
target = order_handle_worker if order else handle_worker
args = (in_queue, out_queue, mapper, out_order) if order else (
in_queue, out_queue, mapper)
workers = []
for i in xrange(process_num):
worker = Thread(
target=handle_worker, args=(in_queue, out_queue, mapper))
worker = Thread(target=target, args=args)
worker.daemon = True
workers.append(worker)
for w in workers:
Expand Down
22 changes: 22 additions & 0 deletions python/paddle/v2/reader/tests/decorator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,27 @@ def test_shuffle(self):
self.assertEqual(total, 10)


class TestXmap(unittest.TestCase):
def test_xmap(self):
def mapper(x):
return (x + 1)

orders = (True, False)
thread_nums = (1, 2, 4, 8, 16)
buffered_size = (1, 2, 4, 8, 16)
for order in orders:
for tNum in thread_nums:
for size in buffered_size:
result = []
for i in paddle.v2.reader.xmap_readers(mapper,
reader_creator_10(),
tNum, size, order)():
result.append(i)
if not order:
result.sort()
for idx, e in enumerate(result):
self.assertEqual(e, mapper(idx))


if __name__ == '__main__':
unittest.main()