Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle horovod errors #24

Merged
merged 8 commits into from
Jan 18, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/mxnet_imagenet_resnet50.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import math
import os


from gluoncv.model_zoo import get_model
import horovod.mxnet as hvd
import mxnet as mx
Expand Down
5 changes: 4 additions & 1 deletion horovod/mxnet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

from horovod.common import check_extension

from horovod.mxnet.mpi_ops import allreduce, allreduce_
check_extension('horovod.mxnet', 'HOROVOD_WITH_MXNET',
__file__, 'mpi_lib')

from horovod.mxnet.mpi_ops import allgather
from horovod.mxnet.mpi_ops import allreduce, allreduce_
from horovod.mxnet.mpi_ops import broadcast, broadcast_
from horovod.mxnet.mpi_ops import init, shutdown
from horovod.mxnet.mpi_ops import size, local_size, rank, local_rank
Expand Down
15 changes: 0 additions & 15 deletions horovod/mxnet/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include "cuda.h"
#endif

#include <mxnet/base.h>

#include "adapter.h"
#include "cuda_util.h"
#include "tensor_util.h"
Expand Down Expand Up @@ -124,19 +122,6 @@ template <class T> Framework MXOpContext<T>::framework() const {
return Framework::MXNET;
}

void ThrowIfError(Status status) {
switch (status.type()) {
case StatusType::OK:
return;
case StatusType::PRECONDITION_ERROR:
throw std::logic_error(status.reason());
case StatusType::ABORTED:
throw std::runtime_error(status.reason());
default: // Includes UNKNOWN_ERROR
throw std::runtime_error(status.reason());
}
}

template class MXTensor<NDArray>;
template class MXTemporaryBuffer<NDArray>;
template class MXOpContext<NDArray>;
Expand Down
8 changes: 7 additions & 1 deletion horovod/mxnet/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#ifndef HOROVOD_MXNET_ADAPTER_H
#define HOROVOD_MXNET_ADAPTER_H

#include <mxnet/base.h>

apeforest marked this conversation as resolved.
Show resolved Hide resolved
#include "../common/common.h"

namespace horovod {
Expand Down Expand Up @@ -68,7 +70,11 @@ template <class T> class MXOpContext : public OpContext {
T* output_;
};

void ThrowIfError(Status status);
inline void ThrowIfError(const Status& status) {
if (!status.ok()) {
throw dmlc::Error(status.reason());
}
}

} // namespace mxnet
} // namespace horovod
Expand Down
83 changes: 45 additions & 38 deletions horovod/mxnet/mpi_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
// limitations under the License.
// =============================================================================

#include <chrono>
#include <memory>
#include <thread>
#include <atomic>

#include "../common/operations.h"
Expand Down Expand Up @@ -43,7 +40,7 @@ std::string GetOpName(std::string prefix, char* name) {
} // namespace

void DoAllreduce(NDArray* tensor, NDArray* output, const std::string& name,
Callback cb) {
Callback on_complete) {
ThrowIfError(common::CheckInitialized());

auto device = TensorUtil::GetDevice(tensor);
Expand All @@ -54,55 +51,56 @@ void DoAllreduce(NDArray* tensor, NDArray* output, const std::string& name,
auto enqueue_result =
EnqueueTensorAllreduce(hvd_context, hvd_tensor, hvd_output, nullptr,
name, device,
[cb](const Status& status) {
cb();
[on_complete](const Status& status) {
on_complete(status.ok() ? nullptr : status.reason().c_str());
});
ThrowIfError(enqueue_result);
}

#if HAVE_CUDA
void DoAllreduceCudaOnCPU(NDArray* tensor, NDArray* output, std::string& name,
Callback cb) {
Callback on_complete) {
ThrowIfError(common::CheckInitialized());

// Make async copy of input tensor to CPU tensor and record completion event.
auto hvd_cpu_buffer = std::make_shared<MXTemporaryBuffer<NDArray>>(
CPU_DEVICE_ID, tensor->dtype());
TensorUtil::AsyncCopyCudaToCPU(tensor, hvd_cpu_buffer->tensor());
auto ready_event = std::make_shared<MXReadyEvent<NDArray>>(tensor);

auto hvd_context = std::make_shared<MXOpContext<NDArray>>(
CPU_DEVICE_ID, hvd_cpu_buffer->tensor());
auto ready_event = std::make_shared<MXReadyEvent<NDArray>>(tensor);
yuxihu marked this conversation as resolved.
Show resolved Hide resolved

auto enqueue_result = EnqueueTensorAllreduce(
hvd_context, hvd_cpu_buffer, hvd_cpu_buffer, ready_event,
name, CPU_DEVICE_ID,
[hvd_cpu_buffer, output, cb](const Status& status) {
[hvd_cpu_buffer, output, on_complete](const Status& status) {
TensorUtil::CopyCPUToCuda(hvd_cpu_buffer->tensor(), output);
cb();
on_complete(status.ok() ? nullptr : status.reason().c_str());
});
ThrowIfError(enqueue_result);
}
#endif

void DoAllgather(NDArray* tensor, NDArray* output, std::string& name,
Callback cb) {
Callback on_complete) {
ThrowIfError(common::CheckInitialized());

auto device = TensorUtil::GetDevice(tensor);
auto hvd_tensor = std::make_shared<MXTensor<NDArray>>(tensor);
auto hvd_context = std::make_shared<MXOpContext<NDArray>>(device, output);

auto enqueue_result =
EnqueueTensorAllgather(hvd_context, hvd_tensor, nullptr,
name, device,
[cb](const Status& status) {
cb();
[on_complete](const Status& status) {
on_complete(status.ok() ? nullptr : status.reason().c_str());
});
ThrowIfError(enqueue_result);
}

#if HAVE_CUDA
void DoAllgatherCudaOnCPU(NDArray* tensor, NDArray* output, std::string& name,
Callback cb) {
Callback on_complete) {
ThrowIfError(common::CheckInitialized());

// Make async copy of input tensor to CPU tensor and record completion event.
Expand All @@ -119,17 +117,18 @@ void DoAllgatherCudaOnCPU(NDArray* tensor, NDArray* output, std::string& name,
auto enqueue_result = EnqueueTensorAllgather(
hvd_context, hvd_cpu_tensor, ready_event,
name, CPU_DEVICE_ID,
[hvd_cpu_output, output, cb](const Status& status) {
[hvd_cpu_output, output, on_complete](const Status& status) {
TensorUtil::CopyCPUToCuda(hvd_cpu_output->tensor(), output);
cb();
on_complete(status.ok() ? nullptr : status.reason().c_str());
yuxihu marked this conversation as resolved.
Show resolved Hide resolved
});
ThrowIfError(enqueue_result);
}
#endif

void DoBroadcast(NDArray* tensor, NDArray* output, int root_rank,
std::string& name, Callback cb) {
std::string& name, Callback on_complete) {
ThrowIfError(common::CheckInitialized());

auto device = TensorUtil::GetDevice(tensor);
auto hvd_tensor = std::make_shared<MXTensor<NDArray>>(tensor);
auto hvd_context = std::make_shared<MXOpContext<NDArray>>(device, output);
Expand All @@ -145,16 +144,16 @@ void DoBroadcast(NDArray* tensor, NDArray* output, int root_rank,
auto enqueue_result = EnqueueTensorBroadcast(
hvd_context, hvd_tensor, hvd_output, root_rank, nullptr,
name, device,
[cb](const Status& status) {
cb();
[on_complete](const Status& status) {
on_complete(status.ok() ? nullptr : status.reason().c_str());
});
ThrowIfError(enqueue_result);
}

#if HAVE_CUDA
void DoBroadcastCudaOnCPU(
std::shared_ptr<MXTemporaryBuffer<NDArray>>& hvd_cpu_buffer, int root_rank,
std::string& name, Callback cb) {
std::string& name, Callback on_complete) {
// Make async copy of input tensor to CPU tensor and record completion event.
auto hvd_context = std::make_shared<MXOpContext<NDArray>>(
CPU_DEVICE_ID, hvd_cpu_buffer->tensor());
Expand All @@ -164,27 +163,29 @@ void DoBroadcastCudaOnCPU(
auto enqueue_result = EnqueueTensorBroadcast(
hvd_context, hvd_cpu_buffer, hvd_cpu_buffer, root_rank, ready_event,
name, CPU_DEVICE_ID,
[cb](const Status& status) {
cb();
[on_complete](const Status& status) {
on_complete(status.ok() ? nullptr : status.reason().c_str());
});
ThrowIfError(enqueue_result);
}
#endif

extern "C" int horovod_mxnet_allreduce_async(NDArray* input, NDArray* output,
char* name, bool average) {
MX_API_BEGIN();

std::string op_name = GetOpName("allreduce", name);
auto allreduce_async_fn = [input, output,
op_name](RunContext rctx,
Callback cb) mutable {
DoAllreduce(input, output, op_name, cb);
Callback on_complete) mutable {
DoAllreduce(input, output, op_name, on_complete);
};

#if HAVE_CUDA
auto allreduce_async_cpu_fn =
[input, output, op_name](RunContext rctx,
Callback cb) mutable {
DoAllreduceCudaOnCPU(input, output, op_name, cb);
Callback on_complete) mutable {
DoAllreduceCudaOnCPU(input, output, op_name, on_complete);
};
#endif

Expand Down Expand Up @@ -217,23 +218,26 @@ extern "C" int horovod_mxnet_allreduce_async(NDArray* input, NDArray* output,
if (average) {
*output /= horovod_size();
}
return 0;

MX_API_END();
}

extern "C" int horovod_mxnet_allgather_async(NDArray* input, NDArray* output,
char* name) {
MX_API_BEGIN();

std::string op_name = GetOpName("allgather", name);
auto allgather_async_fn = [input, output,
op_name](RunContext rctx,
Callback cb) mutable {
DoAllgather(input, output, op_name, cb);
Callback on_complete) mutable {
DoAllgather(input, output, op_name, on_complete);
};

#if HAVE_CUDA
auto allgather_async_cpu_fn =
[input, output, op_name](RunContext rctx,
Callback cb) mutable {
DoAllgatherCudaOnCPU(input, output, op_name, cb);
Callback on_complete) mutable {
DoAllgatherCudaOnCPU(input, output, op_name, on_complete);
};
#endif

Expand Down Expand Up @@ -261,17 +265,19 @@ extern "C" int horovod_mxnet_allgather_async(NDArray* input, NDArray* output,
"HorovodAllgather");
}
#endif
return 0;

MX_API_END();
}

extern "C" int horovod_mxnet_broadcast_async(NDArray* input, NDArray* output,
int root_rank, char* name) {
MX_API_BEGIN();

std::string op_name = GetOpName("broadcast", name);
auto broadcast_async_fn = [input, output, op_name,
root_rank](RunContext rctx,
Callback cb) mutable {
DoBroadcast(input, output, root_rank, op_name, cb);
Callback on_complete) mutable {
DoBroadcast(input, output, root_rank, op_name, on_complete);
};

#if HAVE_CUDA && HOROVOD_GPU_BROADCAST != 'M'
Expand All @@ -283,8 +289,8 @@ extern "C" int horovod_mxnet_broadcast_async(NDArray* input, NDArray* output,
TensorUtil::AsyncCopyCudaToCPU(input, hvd_cpu_buffer->tensor());
auto broadcast_async_cpu_fn =
[hvd_cpu_buffer, op_name, root_rank]
(RunContext rctx, Callback cb) mutable {
DoBroadcastCudaOnCPU(hvd_cpu_buffer, root_rank, op_name, cb);
(RunContext rctx, Callback on_complete) mutable {
DoBroadcastCudaOnCPU(hvd_cpu_buffer, root_rank, op_name, on_complete);
};

Engine::Get()->PushAsync(broadcast_async_cpu_fn, input->ctx(), {},
Expand All @@ -297,7 +303,8 @@ extern "C" int horovod_mxnet_broadcast_async(NDArray* input, NDArray* output,
{output->var()}, FnProperty::kNormal, 0,
"HorovodBroadcast");
#endif
return 0;

MX_API_END();
}

} // namespace mxnet
Expand Down
1 change: 1 addition & 0 deletions horovod/mxnet/mpi_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <mxnet/base.h>
#include <mxnet/c_api.h>
#include <mxnet/c_api_error.h>
#include <mxnet/engine.h>
#include <mxnet/ndarray.h>

Expand Down
6 changes: 3 additions & 3 deletions horovod/mxnet/mpi_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
from __future__ import print_function

# Load all the necessary MXNet C types.
import mxnet as mx
import ctypes
import os

from mxnet.base import c_str_array, c_handle_array, c_array, c_array_buf, c_str
from mxnet.base import check_call, string_types, mx_uint, py_str, string_types
import mxnet as mx
from mxnet.base import c_str, check_call, string_types

from horovod.common import get_ext_suffix
from horovod.common import HorovodBasics as _HorovodBasics
Expand Down Expand Up @@ -77,6 +76,7 @@ def allreduce(tensor, average=True, name=None):
else:
check_call(MPI_MXNET_LIB_CTYPES.horovod_mxnet_allreduce_async(c_in,
c_out, name, ctypes.c_bool(average)))

return output


Expand Down
Loading