Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Add examples of running MXNet with Horovod #14286

Merged
merged 12 commits into from
Mar 22, 2019
201 changes: 201 additions & 0 deletions example/distributed_training-horovod/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
<!--- Licensed to the Apache Software Foundation (ASF) under one -->
<!--- or more contributor license agreements. See the NOTICE file -->
<!--- distributed with this work for additional information -->
<!--- regarding copyright ownership. The ASF licenses this file -->
<!--- to you under the Apache License, Version 2.0 (the -->
<!--- "License"); you may not use this file except in compliance -->
<!--- with the License. You may obtain a copy of the License at -->

<!--- http://www.apache.org/licenses/LICENSE-2.0 -->

<!--- Unless required by applicable law or agreed to in writing, -->
<!--- software distributed under the License is distributed on an -->
<!--- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -->
<!--- KIND, either express or implied. See the License for the -->
<!--- specific language governing permissions and limitations -->
<!--- under the License. -->

# Distributed Training using MXNet with Horovod
[Horovod](/~https://github.com/horovod/horovod) is a distributed training framework that demonstrates
excellent scaling efficiency for dense models running on a large number of nodes. It currently
supports mainstream deep learning frameworks such as MXNet, TensorFlow, Keras, and PyTorch.
It is created at Uber and currently hosted by the [Linux Foundation Deep Learning](https://lfdl.io)(LF DL).

MXNet is supported in Horovod 0.16.0 [release](https://eng.uber.com/horovod-pyspark-apache-mxnet-support/).

## What's New?
Compared with the standard distributed training script in MXNet which uses parameter server to
distribute and aggregate parameters, Horovod uses ring allreduce and/or tree-based allreduce algorithm
to communicate parameters between workers. There is no dedicated server and the communication data size
between workers does not depend on the number of workers. Therefore, it scales well in the case where
there are a large number of workers and network bandwidth is the bottleneck.

# Install
## Install MXNet
```bash
$ pip install mxnet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention that 1.4.0 mkldnn packages do not work with horovod 0.16.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MXNet pip package does not contain mkldnn by default in 1.4.0. I think it is okay here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to mention it in the Install MXNet section. Here we just use mxnet package as an example. Users may choose their own packages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

```
**Note**: There is a [known issue](/~https://github.com/horovod/horovod/issues/884) when running Horovod with MXNet on a Linux system with GCC version 5.X and above. We recommend users to build MXNet from source following this [guide](https://mxnet.incubator.apache.org/install/build_from_source.html) as a workaround for now. Also mxnet-mkl package in 1.4.0 release does not support Horovod.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so currently pip install doesn't work for this use case ? Is this glibc incompatibility ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's not because of glibc incompability but due to the GCC4 and GCC5 std::function signature change. In MXNet-Horovod integration, we passed a std::function as callback from Horovod to MXNet. When Horovod and MXNet are built with different GCC versions, segmentation fault will occurr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are the pips built ? for which gcc version ? does pip have this issue currently ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MXNet pip is built with gcc4. If user builds Horovod on centos7/ubuntu14.0, there will be no issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these steps dont currently work. I would suggest changing this to easiest path currently available : 1. build mxnet with gcc 5 followed by pip install horovod OR 2. pip install mxnet followed by build horovod with gcc4 build. I feel 1 is easier for users. When we fix this bug then we can modify documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which platform are you installing?
The following steps in the README work for me on both MacOS and Amazon Linux and Centos 7 (all gcc4)

pip install mxnet
pip install horovod

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm! i think i misunderstood earlier.


## Install Horovod
```bash
$ pip install horovod
```

This basic installation is good for laptops and for getting to know Horovod.
If you're installing Horovod on a server with GPUs, read the [Horovod on GPU](/~https://github.com/horovod/horovod/blob/master/docs/gpus.md) page.
If you want to use Docker, read the [Horovod in Docker](/~https://github.com/horovod/horovod/blob/master/docs/docker.md) page.

## Install MPI
MPI is required to run distributed training with Horovod. Install [Open MPI](https://www.open-mpi.org/) or another MPI implementation.
Steps to install Open MPI are listed [here](https://www.open-mpi.org/faq/?category=building#easy-build).

**Note**: Open MPI 3.1.3 has an issue that may cause hangs. It is recommended
to downgrade to Open MPI 3.1.2 or upgrade to Open MPI 4.0.0.

# Usage

To run MXNet with Horovod, make the following additions to your training script:

1. Run `hvd.init()`.

2. Pin the context to a processor using `hvd.local_rank()`.
Typically, each Horovod worker is associated with one process. The local rank is a unique ID specifically
for all processes running Horovod job on the same node.

3. Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by
the number of workers. An increase in learning rate compensates for the increased batch size.

4. Wrap optimizer in `hvd.DistributedOptimizer`. The distributed optimizer delegates gradient computation
to the original optimizer, averages gradients using *allreduce* or *allgather*, and then applies those averaged
gradients.

5. Add `hvd.broadcast_parameters` to broadcast initial variable states from rank 0 to all other processes.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or
restored from a checkpoint.

# Example

Here we provide the building blocks to train a model using MXNet with Horovod.
The full examples are in [MNIST](gluon_mnist.py) and [ImageNet](resnet50_imagenet.py).

## Gluon API
```python
from mxnet import autograd, gluon
import mxnet as mx
import horovod.mxnet as hvd

# Initialize Horovod
hvd.init()

# Set context to current process
context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think args is not defined yet. Maybe context.num_gpus()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a code skeleton to showcase the usage. The args is defined in the real example.


num_workers = hvd.size()

# Build model
model = ...
model.hybridize()

# Define hyper parameters
optimizer_params = ...

# Add Horovod Distributed Optimizer
opt = mx.optimizer.create('sgd', **optimizer_params)
opt = hvd.DistributedOptimizer(opt)

# Initialize parameters
model.initialize(initializer, ctx=context)

# Fetch and broadcast parameters
params = model.collect_params()
if params is not None:
hvd.broadcast_parameters(params, root_rank=0)

# Create trainer and loss function
trainer = gluon.Trainer(params, opt, kvstore=None)
loss_fn = ...

# Train model
for epoch in range(num_epoch):
train_data.reset()
for nbatch, batch in enumerate(train_data, start=1):
data = batch.data[0].as_in_context(context)
label = batch.label[0].as_in_context(context)
with autograd.record():
output = model(data.astype(dtype, copy=False))
loss = loss_fn(output, label)
loss.backward()
trainer.step(batch_size)
```

## Module API
```python
import mxnet as mx
import horovod.mxnet as hvd

# Initialize Horovod
hvd.init()

# Set context to current process
context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank())
num_workers = hvd.size()

# Build model
model = ...

# Define hyper parameters
optimizer_params = ...

# Add Horovod Distributed Optimizer
opt = mx.optimizer.create('sgd', **optimizer_params)
opt = hvd.DistributedOptimizer(opt)

# Initialize parameters
initializer = mx.init.Xavier(rnd_type='gaussian', factor_type="in",
magnitude=2)
model.bind(data_shapes=train_data.provide_data,
label_shapes=train_data.provide_label)
model.init_params(initializer)

# Fetch and broadcast parameters
(arg_params, aux_params) = model.get_params()
if arg_params:
hvd.broadcast_parameters(arg_params, root_rank=0)
if aux_params:
hvd.broadcast_parameters(aux_params, root_rank=0)
model.set_params(arg_params=arg_params, aux_params=aux_params)

# Train model
model.fit(train_data,
kvstore=None,
optimizer=opt,
num_epoch=num_epoch)
```


# Running Horovod

The example commands below show how to run distributed training. See the
[Running Horovod](/~https://github.com/horovod/horovod/blob/master/docs/running.md)
page for more instructions, including RoCE/InfiniBand tweaks and tips for dealing with hangs.

1. To run on a machine with 4 CPUs:

```bash
$ mpirun -np 4 \
-H localhost:4 \
-bind-to none -map-by slot \
python train.py
```

2. To run on 2 machines with 4 GPUs each:

```bash
$ mpirun -np 8 \
-H server1:4,server2:4 \
-bind-to none -map-by slot \
-x NCCL_DEBUG=INFO \
-mca pml ob1 -mca btl ^openib \
python train.py
```
186 changes: 186 additions & 0 deletions example/distributed_training-horovod/gluon_mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import argparse
import logging
import os
import zipfile
import time

import mxnet as mx
import horovod.mxnet as hvd
from mxnet import autograd, gluon, nd
from mxnet.test_utils import download

# Training settings
parser = argparse.ArgumentParser(description='MXNet MNIST Example')

parser.add_argument('--batch-size', type=int, default=64,
help='training batch size (default: 64)')
parser.add_argument('--dtype', type=str, default='float32',
help='training data type (default: float32)')
parser.add_argument('--epochs', type=int, default=5,
help='number of training epochs (default: 5)')
parser.add_argument('--lr', type=float, default=0.01,
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum (default: 0.9)')
parser.add_argument('--use-gpu', action='store_true', default=False,
help='run training on GPU (default: False)')
args = parser.parse_args()

logging.basicConfig(level=logging.INFO)
logging.info(args)


# Function to get mnist iterator given a rank
def get_mnist_iterator(rank):
data_dir = "data-%d" % rank
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
zip_file_path = download('http://data.mxnet.io/mxnet/data/mnist.zip',
dirname=data_dir)
with zipfile.ZipFile(zip_file_path) as zf:
zf.extractall(data_dir)

input_shape = (1, 28, 28)
batch_size = args.batch_size

train_iter = mx.io.MNISTIter(
image="%s/train-images-idx3-ubyte" % data_dir,
label="%s/train-labels-idx1-ubyte" % data_dir,
input_shape=input_shape,
batch_size=batch_size,
shuffle=True,
flat=False,
num_parts=hvd.size(),
part_index=hvd.rank()
)

val_iter = mx.io.MNISTIter(
image="%s/t10k-images-idx3-ubyte" % data_dir,
label="%s/t10k-labels-idx1-ubyte" % data_dir,
input_shape=input_shape,
batch_size=batch_size,
flat=False,
)

return train_iter, val_iter


# Function to define neural network
def conv_nets():
net = gluon.nn.HybridSequential()
with net.name_scope():
net.add(gluon.nn.Conv2D(channels=20, kernel_size=5, activation='relu'))
net.add(gluon.nn.MaxPool2D(pool_size=2, strides=2))
net.add(gluon.nn.Conv2D(channels=50, kernel_size=5, activation='relu'))
net.add(gluon.nn.MaxPool2D(pool_size=2, strides=2))
net.add(gluon.nn.Flatten())
net.add(gluon.nn.Dense(512, activation="relu"))
net.add(gluon.nn.Dense(10))
return net


# Function to evaluate accuracy for a model
def evaluate(model, data_iter, context):
data_iter.reset()
metric = mx.metric.Accuracy()
for _, batch in enumerate(data_iter):
data = batch.data[0].as_in_context(context)
label = batch.label[0].as_in_context(context)
output = model(data.astype(args.dtype, copy=False))
metric.update([label], [output])

return metric.get()


# Initialize Horovod
hvd.init()

# Horovod: pin context to local rank
context = mx.gpu(hvd.local_rank()) if args.use_gpu else mx.cpu(hvd.local_rank())
num_workers = hvd.size()

# Load training and validation data
train_data, val_data = get_mnist_iterator(hvd.rank())

# Build model
model = conv_nets()
model.cast(args.dtype)
model.hybridize()

# Define hyper parameters
optimizer_params = {'momentum': args.momentum,
'learning_rate': args.lr * hvd.size(),
'rescale_grad': 1.0 / args.batch_size}

# Add Horovod Distributed Optimizer
opt = mx.optimizer.create('sgd', **optimizer_params)
opt = hvd.DistributedOptimizer(opt)

# Initialize parameters
initializer = mx.init.Xavier(rnd_type='gaussian', factor_type="in",
magnitude=2)
model.initialize(initializer, ctx=context)

# Fetch and broadcast parameters
params = model.collect_params()
if params is not None:
hvd.broadcast_parameters(params, root_rank=0)

# Create trainer, loss function and train metric
trainer = gluon.Trainer(params, opt, kvstore=None)
loss_fn = gluon.loss.SoftmaxCrossEntropyLoss()
metric = mx.metric.Accuracy()

# Train model
for epoch in range(args.epochs):
tic = time.time()
train_data.reset()
metric.reset()
for nbatch, batch in enumerate(train_data, start=1):
data = batch.data[0].as_in_context(context)
label = batch.label[0].as_in_context(context)
with autograd.record():
output = model(data.astype(args.dtype, copy=False))
loss = loss_fn(output, label)
loss.backward()
trainer.step(args.batch_size)
metric.update([label], [output])

if nbatch % 100 == 0:
name, acc = metric.get()
logging.info('[Epoch %d Batch %d] Training: %s=%f' %
(epoch, nbatch, name, acc))

if hvd.rank() == 0:
elapsed = time.time() - tic
speed = nbatch * args.batch_size * hvd.size() / elapsed
logging.info('Epoch[%d]\tSpeed=%.2f samples/s\tTime cost=%f',
epoch, speed, elapsed)

# Evaluate model accuracy
_, train_acc = metric.get()
name, val_acc = evaluate(model, val_data, context)
if hvd.rank() == 0:
logging.info('Epoch[%d]\tTrain: %s=%f\tValidation: %s=%f', epoch, name,
train_acc, name, val_acc)

if hvd.rank() == 0 and epoch == args.epochs - 1:
assert val_acc > 0.96, "Achieved accuracy (%f) is lower than expected\
(0.96)" % val_acc
Loading