Skip to content

Commit

Permalink
Add examples of running MXNet with Horovod (apache#14286)
Browse files Browse the repository at this point in the history
* Add examples for MXNet with Horovod

* update readme

* update examples

* update README

* update mnist_module example

* Update README

* update README

* update README

* update README
  • Loading branch information
apeforest authored and haohuw committed Jun 23, 2019
1 parent 589069f commit e92001e
Show file tree
Hide file tree
Showing 4 changed files with 1,002 additions and 0 deletions.
201 changes: 201 additions & 0 deletions example/distributed_training-horovod/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
<!--- Licensed to the Apache Software Foundation (ASF) under one -->
<!--- or more contributor license agreements. See the NOTICE file -->
<!--- distributed with this work for additional information -->
<!--- regarding copyright ownership. The ASF licenses this file -->
<!--- to you under the Apache License, Version 2.0 (the -->
<!--- "License"); you may not use this file except in compliance -->
<!--- with the License. You may obtain a copy of the License at -->

<!--- http://www.apache.org/licenses/LICENSE-2.0 -->

<!--- Unless required by applicable law or agreed to in writing, -->
<!--- software distributed under the License is distributed on an -->
<!--- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -->
<!--- KIND, either express or implied. See the License for the -->
<!--- specific language governing permissions and limitations -->
<!--- under the License. -->

# Distributed Training using MXNet with Horovod
[Horovod](/~https://github.com/horovod/horovod) is a distributed training framework that demonstrates
excellent scaling efficiency for dense models running on a large number of nodes. It currently
supports mainstream deep learning frameworks such as MXNet, TensorFlow, Keras, and PyTorch.
It is created at Uber and currently hosted by the [Linux Foundation Deep Learning](https://lfdl.io)(LF DL).

MXNet is supported in Horovod 0.16.0 [release](https://eng.uber.com/horovod-pyspark-apache-mxnet-support/).

## What's New?
Compared with the standard distributed training script in MXNet which uses parameter server to
distribute and aggregate parameters, Horovod uses ring allreduce and/or tree-based allreduce algorithm
to communicate parameters between workers. There is no dedicated server and the communication data size
between workers does not depend on the number of workers. Therefore, it scales well in the case where
there are a large number of workers and network bandwidth is the bottleneck.

# Install
## Install MXNet
```bash
$ pip install mxnet
```
**Note**: There is a [known issue](/~https://github.com/horovod/horovod/issues/884) when running Horovod with MXNet on a Linux system with GCC version 5.X and above. We recommend users to build MXNet from source following this [guide](https://mxnet.incubator.apache.org/install/build_from_source.html) as a workaround for now. Also mxnet-mkl package in 1.4.0 release does not support Horovod.

## Install Horovod
```bash
$ pip install horovod
```

This basic installation is good for laptops and for getting to know Horovod.
If you're installing Horovod on a server with GPUs, read the [Horovod on GPU](/~https://github.com/horovod/horovod/blob/master/docs/gpus.md) page.
If you want to use Docker, read the [Horovod in Docker](/~https://github.com/horovod/horovod/blob/master/docs/docker.md) page.

## Install MPI
MPI is required to run distributed training with Horovod. Install [Open MPI](https://www.open-mpi.org/) or another MPI implementation.
Steps to install Open MPI are listed [here](https://www.open-mpi.org/faq/?category=building#easy-build).

**Note**: Open MPI 3.1.3 has an issue that may cause hangs. It is recommended
to downgrade to Open MPI 3.1.2 or upgrade to Open MPI 4.0.0.

# Usage

To run MXNet with Horovod, make the following additions to your training script:

1. Run `hvd.init()`.

2. Pin the context to a processor using `hvd.local_rank()`.
Typically, each Horovod worker is associated with one process. The local rank is a unique ID specifically
for all processes running Horovod job on the same node.

3. Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by
the number of workers. An increase in learning rate compensates for the increased batch size.

4. Wrap optimizer in `hvd.DistributedOptimizer`. The distributed optimizer delegates gradient computation
to the original optimizer, averages gradients using *allreduce* or *allgather*, and then applies those averaged
gradients.

5. Add `hvd.broadcast_parameters` to broadcast initial variable states from rank 0 to all other processes.
This is necessary to ensure consistent initialization of all workers when training is started with random weights or
restored from a checkpoint.

# Example

Here we provide the building blocks to train a model using MXNet with Horovod.
The full examples are in [MNIST](gluon_mnist.py) and [ImageNet](resnet50_imagenet.py).

## Gluon API
```python
from mxnet import autograd, gluon
import mxnet as mx
import horovod.mxnet as hvd

# Initialize Horovod
hvd.init()

# Set context to current process
context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank())

num_workers = hvd.size()

# Build model
model = ...
model.hybridize()

# Define hyper parameters
optimizer_params = ...

# Add Horovod Distributed Optimizer
opt = mx.optimizer.create('sgd', **optimizer_params)
opt = hvd.DistributedOptimizer(opt)

# Initialize parameters
model.initialize(initializer, ctx=context)

# Fetch and broadcast parameters
params = model.collect_params()
if params is not None:
hvd.broadcast_parameters(params, root_rank=0)

# Create trainer and loss function
trainer = gluon.Trainer(params, opt, kvstore=None)
loss_fn = ...

# Train model
for epoch in range(num_epoch):
train_data.reset()
for nbatch, batch in enumerate(train_data, start=1):
data = batch.data[0].as_in_context(context)
label = batch.label[0].as_in_context(context)
with autograd.record():
output = model(data.astype(dtype, copy=False))
loss = loss_fn(output, label)
loss.backward()
trainer.step(batch_size)
```

## Module API
```python
import mxnet as mx
import horovod.mxnet as hvd

# Initialize Horovod
hvd.init()

# Set context to current process
context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank())
num_workers = hvd.size()

# Build model
model = ...

# Define hyper parameters
optimizer_params = ...

# Add Horovod Distributed Optimizer
opt = mx.optimizer.create('sgd', **optimizer_params)
opt = hvd.DistributedOptimizer(opt)

# Initialize parameters
initializer = mx.init.Xavier(rnd_type='gaussian', factor_type="in",
magnitude=2)
model.bind(data_shapes=train_data.provide_data,
label_shapes=train_data.provide_label)
model.init_params(initializer)

# Fetch and broadcast parameters
(arg_params, aux_params) = model.get_params()
if arg_params:
hvd.broadcast_parameters(arg_params, root_rank=0)
if aux_params:
hvd.broadcast_parameters(aux_params, root_rank=0)
model.set_params(arg_params=arg_params, aux_params=aux_params)

# Train model
model.fit(train_data,
kvstore=None,
optimizer=opt,
num_epoch=num_epoch)
```


# Running Horovod

The example commands below show how to run distributed training. See the
[Running Horovod](/~https://github.com/horovod/horovod/blob/master/docs/running.md)
page for more instructions, including RoCE/InfiniBand tweaks and tips for dealing with hangs.

1. To run on a machine with 4 CPUs:

```bash
$ mpirun -np 4 \
-H localhost:4 \
-bind-to none -map-by slot \
python train.py
```

2. To run on 2 machines with 4 GPUs each:

```bash
$ mpirun -np 8 \
-H server1:4,server2:4 \
-bind-to none -map-by slot \
-x NCCL_DEBUG=INFO \
-mca pml ob1 -mca btl ^openib \
python train.py
```
186 changes: 186 additions & 0 deletions example/distributed_training-horovod/gluon_mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import argparse
import logging
import os
import zipfile
import time

import mxnet as mx
import horovod.mxnet as hvd
from mxnet import autograd, gluon, nd
from mxnet.test_utils import download

# Training settings
parser = argparse.ArgumentParser(description='MXNet MNIST Example')

parser.add_argument('--batch-size', type=int, default=64,
help='training batch size (default: 64)')
parser.add_argument('--dtype', type=str, default='float32',
help='training data type (default: float32)')
parser.add_argument('--epochs', type=int, default=5,
help='number of training epochs (default: 5)')
parser.add_argument('--lr', type=float, default=0.01,
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum (default: 0.9)')
parser.add_argument('--use-gpu', action='store_true', default=False,
help='run training on GPU (default: False)')
args = parser.parse_args()

logging.basicConfig(level=logging.INFO)
logging.info(args)


# Function to get mnist iterator given a rank
def get_mnist_iterator(rank):
data_dir = "data-%d" % rank
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
zip_file_path = download('http://data.mxnet.io/mxnet/data/mnist.zip',
dirname=data_dir)
with zipfile.ZipFile(zip_file_path) as zf:
zf.extractall(data_dir)

input_shape = (1, 28, 28)
batch_size = args.batch_size

train_iter = mx.io.MNISTIter(
image="%s/train-images-idx3-ubyte" % data_dir,
label="%s/train-labels-idx1-ubyte" % data_dir,
input_shape=input_shape,
batch_size=batch_size,
shuffle=True,
flat=False,
num_parts=hvd.size(),
part_index=hvd.rank()
)

val_iter = mx.io.MNISTIter(
image="%s/t10k-images-idx3-ubyte" % data_dir,
label="%s/t10k-labels-idx1-ubyte" % data_dir,
input_shape=input_shape,
batch_size=batch_size,
flat=False,
)

return train_iter, val_iter


# Function to define neural network
def conv_nets():
net = gluon.nn.HybridSequential()
with net.name_scope():
net.add(gluon.nn.Conv2D(channels=20, kernel_size=5, activation='relu'))
net.add(gluon.nn.MaxPool2D(pool_size=2, strides=2))
net.add(gluon.nn.Conv2D(channels=50, kernel_size=5, activation='relu'))
net.add(gluon.nn.MaxPool2D(pool_size=2, strides=2))
net.add(gluon.nn.Flatten())
net.add(gluon.nn.Dense(512, activation="relu"))
net.add(gluon.nn.Dense(10))
return net


# Function to evaluate accuracy for a model
def evaluate(model, data_iter, context):
data_iter.reset()
metric = mx.metric.Accuracy()
for _, batch in enumerate(data_iter):
data = batch.data[0].as_in_context(context)
label = batch.label[0].as_in_context(context)
output = model(data.astype(args.dtype, copy=False))
metric.update([label], [output])

return metric.get()


# Initialize Horovod
hvd.init()

# Horovod: pin context to local rank
context = mx.gpu(hvd.local_rank()) if args.use_gpu else mx.cpu(hvd.local_rank())
num_workers = hvd.size()

# Load training and validation data
train_data, val_data = get_mnist_iterator(hvd.rank())

# Build model
model = conv_nets()
model.cast(args.dtype)
model.hybridize()

# Define hyper parameters
optimizer_params = {'momentum': args.momentum,
'learning_rate': args.lr * hvd.size(),
'rescale_grad': 1.0 / args.batch_size}

# Add Horovod Distributed Optimizer
opt = mx.optimizer.create('sgd', **optimizer_params)
opt = hvd.DistributedOptimizer(opt)

# Initialize parameters
initializer = mx.init.Xavier(rnd_type='gaussian', factor_type="in",
magnitude=2)
model.initialize(initializer, ctx=context)

# Fetch and broadcast parameters
params = model.collect_params()
if params is not None:
hvd.broadcast_parameters(params, root_rank=0)

# Create trainer, loss function and train metric
trainer = gluon.Trainer(params, opt, kvstore=None)
loss_fn = gluon.loss.SoftmaxCrossEntropyLoss()
metric = mx.metric.Accuracy()

# Train model
for epoch in range(args.epochs):
tic = time.time()
train_data.reset()
metric.reset()
for nbatch, batch in enumerate(train_data, start=1):
data = batch.data[0].as_in_context(context)
label = batch.label[0].as_in_context(context)
with autograd.record():
output = model(data.astype(args.dtype, copy=False))
loss = loss_fn(output, label)
loss.backward()
trainer.step(args.batch_size)
metric.update([label], [output])

if nbatch % 100 == 0:
name, acc = metric.get()
logging.info('[Epoch %d Batch %d] Training: %s=%f' %
(epoch, nbatch, name, acc))

if hvd.rank() == 0:
elapsed = time.time() - tic
speed = nbatch * args.batch_size * hvd.size() / elapsed
logging.info('Epoch[%d]\tSpeed=%.2f samples/s\tTime cost=%f',
epoch, speed, elapsed)

# Evaluate model accuracy
_, train_acc = metric.get()
name, val_acc = evaluate(model, val_data, context)
if hvd.rank() == 0:
logging.info('Epoch[%d]\tTrain: %s=%f\tValidation: %s=%f', epoch, name,
train_acc, name, val_acc)

if hvd.rank() == 0 and epoch == args.epochs - 1:
assert val_acc > 0.96, "Achieved accuracy (%f) is lower than expected\
(0.96)" % val_acc
Loading

0 comments on commit e92001e

Please sign in to comment.