diff --git a/tests/python/gpu/test_operator_gpu.py b/tests/python/gpu/test_operator_gpu.py index fdc8a5b30828..38809921f8c6 100644 --- a/tests/python/gpu/test_operator_gpu.py +++ b/tests/python/gpu/test_operator_gpu.py @@ -2102,63 +2102,51 @@ def test_bilinear_sampler_versions(): assert_almost_equal(exe.grad_dict['grid'].asnumpy(), exe_list[ref_idx].grad_dict['grid'].asnumpy(), rtol=1e-3, atol=1e-5) -@with_seed() -@unittest.skip("test fails on windows gpu. temporarily disabled till it gets fixed. tracked at /~https://github.com/apache/incubator-mxnet/issues/14368") -def test_bulking(): - # Return the execution time of a model with the specified limits to the bulked op segments - def test_bulking_helper(data_shape, num_ops, num_iterations, - max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training): - orig_environ = os.environ.copy() - try: - # Explore different ways of setting the env vars. - # The framework does not cache the bulked seg size env var lookups during symbolic. - os.environ['MXNET_EXEC_BULK_EXEC_TRAIN'] = str(enable_bulking_in_training) - if max_fwd_segment_size == max_bwd_segment_size: - os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN'] = str(max_fwd_segment_size) - os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD', None) - os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD', None) - else: - os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN', None) - os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD'] = str(max_fwd_segment_size) - os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD'] = str(max_bwd_segment_size) - - ctx = default_context() - # build symbol - X = mx.sym.Variable('X') - sym = mx.sym.flip(X, axis=0) - for _ in range(num_ops-1): - sym = mx.sym.flip(sym, axis=0) - x = mx.ndarray.zeros(data_shape) - dx = mx.ndarray.zeros(data_shape) - dy = mx.ndarray.ones(data_shape) - exe = sym.bind(ctx=ctx, args=[x], args_grad = {'X':dx}) - - # time a number of forward() and backward() executions after some warm-up iterations - warmups = 1 - for i in range(num_iterations+warmups): - if i == warmups: - start = time.time() - exe.forward(is_train=True) - exe.backward(dy) - dx.wait_to_read() - time_per_iteration = (time.time() - start) / num_iterations - finally: - os.environ.clear() - os.environ.update(orig_environ) - return time_per_iteration - +# isolated execution bulking test function to be invoked with different env var settings +def _test_bulking_in_process(seed, time_per_iteration): data_shape = (10,) num_ops = 1000 num_iterations = 20 + ctx = default_context() + # build symbol + X = mx.sym.Variable('X') + sym = mx.sym.flip(X, axis=0) + for _ in range(num_ops-1): + sym = mx.sym.flip(sym, axis=0) + x = mx.ndarray.zeros(data_shape) + dx = mx.ndarray.zeros(data_shape) + dy = mx.ndarray.ones(data_shape) + exe = sym.bind(ctx=ctx, args=[x], args_grad = {'X':dx}) + + # time a number of forward() and backward() executions after some warm-up iterations + warmups = 1 + for i in range(num_iterations+warmups): + if i == warmups: + start = time.time() + exe.forward(is_train=True) + exe.backward(dy) + dx.wait_to_read() + time_per_iteration.value = (time.time() - start) / num_iterations + +@with_seed() +def test_bulking(): # test case format: (max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training) test_cases = [(0,0,True), (1,1,True), (15,15,False), (15,0,True), (0,15,True), (15,15,True)] times = {} times_str = '' for seg_sizes in test_cases: - times[seg_sizes] = test_bulking_helper(data_shape, num_ops, num_iterations, - seg_sizes[0], seg_sizes[1], seg_sizes[2]) - times_str +=\ + # Create shared variable to return measured time from test process + time_per_iteration = mp.Manager().Value('d', 0.0) + if not run_in_spawned_process(_test_bulking_in_process, + {'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD' : seg_sizes[0], + 'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD' : seg_sizes[1], + 'MXNET_EXEC_BULK_EXEC_TRAIN' : seg_sizes[2]}, + time_per_iteration): + # skip test since the python version can't run it properly. Warning msg was logged. + return + times[seg_sizes] = time_per_iteration.value + times_str += \ '\n runtime of (fwd,bwd,enable) op seg setting ({},{},{}) =\t{:.1f} msec'.format( seg_sizes[0], seg_sizes[1], seg_sizes[2], 1000.0 * times[seg_sizes]) @@ -2170,12 +2158,12 @@ def test_bulking_helper(data_shape, num_ops, num_iterations, print(times_str) # Non-bulked times[0,0,True], times[1,1,True] and times[15,15,False] should be about the same, # slower than both half-bulked times[0,15,True] and times[15,0,True] - assert slowest_half_bulked_time < fastest_non_bulked_time,\ - 'A half-bulked exec time is slower than the non-bulked time by {} secs! {}'\ + assert slowest_half_bulked_time < fastest_non_bulked_time, \ + 'A half-bulked exec time is slower than the non-bulked time by {} secs! {}' \ .format(slowest_half_bulked_time - fastest_non_bulked_time, times_str) # The fully bulked times[15,15,True] should be faster than both half-bulked runs - assert fully_bulked_time < fastest_half_bulked_time,\ - 'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}'\ + assert fully_bulked_time < fastest_half_bulked_time, \ + 'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}' \ .format(fully_bulked_time - fastest_half_bulked_time, times_str)