Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support offload in sharding stage2 #37904

Merged
merged 11 commits into from
Dec 9, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import paddle
import paddle.fluid as fluid
from paddle import framework
from paddle.fluid import core
import paddle.distributed as dist
from paddle.optimizer import Optimizer
from paddle.fluid.clip import ClipGradByGlobalNorm

from ...utils.internal_storage import ParamStorage
from ...meta_parallel.sharding.sharding_utils import Type
from ...meta_parallel.sharding.sharding_utils import Type, device_guard, ShardingClipGrad

# CUDA alignment 256 bytes
alignment = {"gpu": 256, }
Expand Down Expand Up @@ -99,16 +101,41 @@ def __init__(self,

self.broadcast_fp16 = broadcast_fp16
self.param_storages = {} # {dtype: {rank: InternalStorage}}

if isinstance(self._optim._grad_clip, ClipGradByGlobalNorm):
logging.warning(
"While using ClipGradByGlobalNorm in ShardingOptimizer, the grad clip of original optimizer will be changed."
)
self._optim._grad_clip = ShardingClipGrad(self._optim._grad_clip,
group,
paddle.get_device())

if offload:
assert self._pfp16, "Only support offload strategy while using \'Adam\', \'AdamW\' and \'Momentum\' optimizer with AMP/Pure FP16"

self.offload = offload # Using for offload
self.offload_device = "cpu"

self._master_params = {}

# Update optimizer parameters and adjust parameter storage and use according to rank.
self.update_opt_status()

def _generate_master_params(self, trainable_params):
for param in trainable_params:
if param.dtype == Type.fp16.value:
self._optim._master_weights[param.name] = paddle.cast(
param, Type.fp32.value)
if self.offload:
for param in trainable_params:
if param.name not in self._master_params.keys():
self._master_params[param.name] = core.VarBase(
name=param.name,
value=param.cast(dtype=Type.fp32.value).numpy(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个也改成.value().get_tensor()吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

place=core.CPUPlace(),
stop_gradient=param.stop_gradient)
self._optim._master_weights = self._master_params
else:
for param in trainable_params:
if param.dtype == Type.fp16.value:
self._optim._master_weights[param.name] = paddle.cast(
param, Type.fp32.value)

def update_opt_status(self):
"""Update optimizer status and parameter storage information, and special functions to be developed.
Expand Down Expand Up @@ -243,22 +270,43 @@ def step(self):
A wrapper for Optimizer's step function to finish the update operation of the optimizer.
"""

# Synchronize optimizer parameters for the current rank
if len(self.dtype_rank_params.keys(
)) == 1 and Type.fp32.value in self.dtype_rank_params.keys():
self._optim._parameter_list = self.dtype_rank_params[
Type.fp32.value][self.rank]
elif len(self.dtype_rank_params.keys(
)) == 1 and Type.fp16.value in self.dtype_rank_params.keys():
self._optim._parameter_list = self.dtype_rank_params[
Type.fp16.value][self.rank]
if self.offload:
self._optim._parameter_list = [
param for name, param in self._master_params.items()
]
else:
self._optim._parameter_list = self.dtype_rank_params[
Type.fp16.value][self.rank] + self.dtype_rank_params[
# Synchronize optimizer parameters for the current rank
if len(self.dtype_rank_params.keys(
)) == 1 and Type.fp32.value in self.dtype_rank_params.keys():
self._optim._parameter_list = self.dtype_rank_params[
Type.fp32.value][self.rank]
elif len(self.dtype_rank_params.keys(
)) == 1 and Type.fp16.value in self.dtype_rank_params.keys():
self._optim._parameter_list = self.dtype_rank_params[
Type.fp16.value][self.rank]
else:
self._optim._parameter_list = self.dtype_rank_params[
Type.fp16.value][self.rank] + self.dtype_rank_params[
Type.fp32.value][self.rank]

# Run the optimizer of the current rank step
self._optim.step()
if self.offload:
with device_guard(self.rank, self.offload_device):
self._optim.step()

for param in self._optim._parameter_list:
self._master_params[param.name].set_value(param)

dev_id = 0 if paddle.get_device() == "cpu" else int(
paddle.get_device().split(":")[1])

for param in self._local_params:
if param.name in self._master_params.keys():
param.set_value(self._master_params[param.name].cuda(dev_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方会增加显存,需要先释放param,在shareddata master参数。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

.cast(dtype=param.dtype))
self._master_params[param.name].clear_gradient(False)
else:
self._optim.step()

# Synchronize all the updated shards in between the ranks
self._broadcast_params()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ def __init__(
self._has_grad_storage = []
self._grad_storage_list = []

# offload
# TODO(haohongxiang): Now it's not supported for multi-optimizers using Offload strategy
self._offload_optims = list(
filter(lambda optim: optim.offload, self._sharding_optimizers))
if len(self._offload_optims) > 0:
assert len(
self._sharding_optimizers
) == 1, "Only support offload strategy for single optimizer"

self._offload = self._sharding_optimizers[0].offload
self._offload_device = "cpu"

# Set backward pass hooks
self._bw_hooks = []

Expand Down Expand Up @@ -156,7 +168,8 @@ def clear_gradients(self):
# Release grad storages
for dtype in self._grad_storages.keys():
if self._rank in self._grad_storages[dtype].keys():
self._grad_storages[dtype][self._rank].buffer.zero_()
if not self._offload:
self._grad_storages[dtype][self._rank].buffer.zero_()

# Release params
for param in self._trainable_params:
Expand All @@ -167,17 +180,24 @@ def grad_scale(self):
"""
Before the gradient accumulation, scale the gradient.
"""
# Scale grad storages
for dtype in self._grad_storages.keys():
if self._rank in self._grad_storages[dtype].keys():
self._grad_storages[dtype][self._rank].buffer.scale_(
scale=self._world_size_scaling)

# Scale params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.grad.scale_(scale=self._world_size_scaling)
param._reset_grad_inplace_version(True)
if self._offload:
for param in self._trainable_params:
if param.name in self._sharding_optimizers[
0]._master_params.keys():
self._sharding_optimizers[0]._master_params[
param.name].grad.scale_(scale=self._world_size_scaling)
else:
# Scale grad storages
for dtype in self._grad_storages.keys():
if self._rank in self._grad_storages[dtype].keys():
self._grad_storages[dtype][self._rank].buffer.scale_(
scale=self._world_size_scaling)

# Scale params
for param in self._trainable_params:
if param.name in self._param_grads and param.grad is not None:
param.grad.scale_(scale=self._world_size_scaling)
param._reset_grad_inplace_version(True)

def _init_internal_storage(self, needs_fresh):
"""
Expand All @@ -195,8 +215,14 @@ def to(self, device=None, dtype=None, blocking=True):
"""
Synchronously or asynchronously convert the data type of the layer, the device is not supported now.
"""
assert isinstance(device, str), "Device must be type str"
assert device == self._default_device, "New devices are not supported, because of the optimizer state is not sync"

self._layer.to(device=device, dtype=dtype, blocking=blocking)

# Re-build the buckets, hooks, etc..
self._fresh_trainable()

def _fresh_trainable(self):
""" Whether to update training parameters. """

Expand Down Expand Up @@ -283,12 +309,17 @@ def reduce(*_):
self._grad_reduced[index] = False
if not self._accumulate_grads:
param.grad.scale_(scale=self._world_size_scaling)
param._reset_grad_inplace_version(True)
param._reset_grad_inplace_version(True)

# Clear the gradient that does not belong to the current rank through the callback function
def cleanup():
if dst_rank != self._rank:
param.clear_gradient(False)
elif self._offload:
self._sharding_optimizers[0]._master_params[
param.name]._copy_gradient_from(param.grad.cpu(
).cast(dtype=Type.fp32.value))
param.clear_gradient(False)

# Synchronize the reduce parameter gradient
self._tasks_flow.append(
Expand Down Expand Up @@ -339,6 +370,15 @@ def cleanup():

grad_storage.buffer.value().get_tensor()._clear(
)
elif self._offload:
grad_storage.to(device=self._offload_device)
for param in grad_storage._params:
self._sharding_optimizers[0]._master_params[
param.name]._copy_gradient_from(
param.grad.cast(
dtype=Type.fp32.value))
grad_storage.buffer.value().get_tensor()._clear(
)

# Reduce the bucket
grad_storage.sent = True
Expand Down Expand Up @@ -478,7 +518,7 @@ def _build_grad_storages(self):
# Rebuild fp16/fp32 grad storages
for dtype in self._grad_storages.keys():
for dst_rank, grad_storage in self._grad_storages[dtype].items():
if dst_rank != self._rank:
if self._offload or dst_rank != self._rank:
grad_storage.manumal_relase()
grad_storage.rebuild()

Expand Down
Loading