Skip to content

Commit

Permalink
Enable bulking test on windows (apache#14392)
Browse files Browse the repository at this point in the history
* Reenable test_operator_gpu.py:test_bulking.

* Add temporary debug output.

* Add test_gluon_gpu.py:test_bulking to test spawn approach.

* Reorder testing to see test_gluon_gpu.py:test_bulking result.

* Change test_operator_gpu.py:test_bulking to use spawn approach.

* Remove diagnostic output and revert windows ci changes.
  • Loading branch information
DickJC123 authored and vdantu committed Mar 31, 2019
1 parent 8f9c21e commit e974ac2
Showing 1 changed file with 40 additions and 52 deletions.
92 changes: 40 additions & 52 deletions tests/python/gpu/test_operator_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2102,63 +2102,51 @@ def test_bilinear_sampler_versions():
assert_almost_equal(exe.grad_dict['grid'].asnumpy(), exe_list[ref_idx].grad_dict['grid'].asnumpy(), rtol=1e-3, atol=1e-5)


@with_seed()
@unittest.skip("test fails on windows gpu. temporarily disabled till it gets fixed. tracked at /~https://github.com/apache/incubator-mxnet/issues/14368")
def test_bulking():
# Return the execution time of a model with the specified limits to the bulked op segments
def test_bulking_helper(data_shape, num_ops, num_iterations,
max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training):
orig_environ = os.environ.copy()
try:
# Explore different ways of setting the env vars.
# The framework does not cache the bulked seg size env var lookups during symbolic.
os.environ['MXNET_EXEC_BULK_EXEC_TRAIN'] = str(enable_bulking_in_training)
if max_fwd_segment_size == max_bwd_segment_size:
os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN'] = str(max_fwd_segment_size)
os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD', None)
os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD', None)
else:
os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN', None)
os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD'] = str(max_fwd_segment_size)
os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD'] = str(max_bwd_segment_size)

ctx = default_context()
# build symbol
X = mx.sym.Variable('X')
sym = mx.sym.flip(X, axis=0)
for _ in range(num_ops-1):
sym = mx.sym.flip(sym, axis=0)
x = mx.ndarray.zeros(data_shape)
dx = mx.ndarray.zeros(data_shape)
dy = mx.ndarray.ones(data_shape)
exe = sym.bind(ctx=ctx, args=[x], args_grad = {'X':dx})

# time a number of forward() and backward() executions after some warm-up iterations
warmups = 1
for i in range(num_iterations+warmups):
if i == warmups:
start = time.time()
exe.forward(is_train=True)
exe.backward(dy)
dx.wait_to_read()
time_per_iteration = (time.time() - start) / num_iterations
finally:
os.environ.clear()
os.environ.update(orig_environ)
return time_per_iteration

# isolated execution bulking test function to be invoked with different env var settings
def _test_bulking_in_process(seed, time_per_iteration):
data_shape = (10,)
num_ops = 1000
num_iterations = 20

ctx = default_context()
# build symbol
X = mx.sym.Variable('X')
sym = mx.sym.flip(X, axis=0)
for _ in range(num_ops-1):
sym = mx.sym.flip(sym, axis=0)
x = mx.ndarray.zeros(data_shape)
dx = mx.ndarray.zeros(data_shape)
dy = mx.ndarray.ones(data_shape)
exe = sym.bind(ctx=ctx, args=[x], args_grad = {'X':dx})

# time a number of forward() and backward() executions after some warm-up iterations
warmups = 1
for i in range(num_iterations+warmups):
if i == warmups:
start = time.time()
exe.forward(is_train=True)
exe.backward(dy)
dx.wait_to_read()
time_per_iteration.value = (time.time() - start) / num_iterations

@with_seed()
def test_bulking():
# test case format: (max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training)
test_cases = [(0,0,True), (1,1,True), (15,15,False), (15,0,True), (0,15,True), (15,15,True)]
times = {}
times_str = ''
for seg_sizes in test_cases:
times[seg_sizes] = test_bulking_helper(data_shape, num_ops, num_iterations,
seg_sizes[0], seg_sizes[1], seg_sizes[2])
times_str +=\
# Create shared variable to return measured time from test process
time_per_iteration = mp.Manager().Value('d', 0.0)
if not run_in_spawned_process(_test_bulking_in_process,
{'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD' : seg_sizes[0],
'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD' : seg_sizes[1],
'MXNET_EXEC_BULK_EXEC_TRAIN' : seg_sizes[2]},
time_per_iteration):
# skip test since the python version can't run it properly. Warning msg was logged.
return
times[seg_sizes] = time_per_iteration.value
times_str += \
'\n runtime of (fwd,bwd,enable) op seg setting ({},{},{}) =\t{:.1f} msec'.format(
seg_sizes[0], seg_sizes[1], seg_sizes[2], 1000.0 * times[seg_sizes])

Expand All @@ -2170,12 +2158,12 @@ def test_bulking_helper(data_shape, num_ops, num_iterations,
print(times_str)
# Non-bulked times[0,0,True], times[1,1,True] and times[15,15,False] should be about the same,
# slower than both half-bulked times[0,15,True] and times[15,0,True]
assert slowest_half_bulked_time < fastest_non_bulked_time,\
'A half-bulked exec time is slower than the non-bulked time by {} secs! {}'\
assert slowest_half_bulked_time < fastest_non_bulked_time, \
'A half-bulked exec time is slower than the non-bulked time by {} secs! {}' \
.format(slowest_half_bulked_time - fastest_non_bulked_time, times_str)
# The fully bulked times[15,15,True] should be faster than both half-bulked runs
assert fully_bulked_time < fastest_half_bulked_time,\
'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}'\
assert fully_bulked_time < fastest_half_bulked_time, \
'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}' \
.format(fully_bulked_time - fastest_half_bulked_time, times_str)


Expand Down

0 comments on commit e974ac2

Please sign in to comment.