Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auto Parallel] elastic support auto parallel re-launch #37523

Merged
merged 22 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions python/paddle/distributed/fleet/elastic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
from .manager import ElasticManager
from .manager import ElasticStatus
from .manager import ELASTIC_EXIT_CODE
from .manager import ElasticLevel
from .collective import CollectiveLauncher

from paddle.distributed.fleet.launch_utils import DistributeMode


def enable_elastic(args, distribute_mode):
if distribute_mode != DistributeMode.COLLECTIVE:
return False
#elastic_level = os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL')
#if not elastic_level and (elastic_level != ElasticLevel.FAULT_TOLERANCE and
# elastic_level != ElasticLevel.ELASTIC):
# return False

#if distribute_mode != DistributeMode.COLLECTIVE:
# return False

if not args.elastic_server and not os.getenv('PADDLE_ELASTIC_SERVER'):
return False
Expand Down
44 changes: 6 additions & 38 deletions python/paddle/distributed/fleet/elastic/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,10 @@ def __init__(self, args):
def launch(self):
logger.info("collective lauchner launch ...")
args = self.args
# parse arguments, used for cloud-single-machine and local
(device_mode,
devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".
format(trainers_num, device_mode, devices_per_proc))

cluster = None
pod = None

start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
elif device_mode == DeviceMode.ASCEND_NPU:
# for ascend
cluster, pod = ascend_utils.get_cloud_cluster(
rank_table_file=os.getenv("RANK_TABLE_FILE", None),
device_mode=device_mode,
start_port=start_port)
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = paddle.distributed.fleet.launch.get_cluster_from_args(
args, device_mode, devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))

global_envs = copy.copy(os.environ.copy())
self.gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(
os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir
self.tmp_dir = tempfile.mkdtemp()
global_envs = paddle.distributed.fleet.launch.get_global_envs(
args, self.tmp_dir)
cluster, pod = paddle.distributed.fleet.launch.get_cluster_info(args)

self.procs = start_local_trainers(
cluster,
Expand All @@ -82,8 +50,8 @@ def stop(self):
logger.info("collective lauchner stop ...")
if not self._terminate_procs():
logger.error("kill process failed")
if os.path.exists(self.gloo_rendezvous_dir):
shutil.rmtree(self.gloo_rendezvous_dir)
if os.path.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)

def watch(self):
logger.debug("collective lauchner watch ...")
Expand Down
17 changes: 16 additions & 1 deletion python/paddle/distributed/fleet/elastic/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
logger.addHandler(ch)

ELASTIC_EXIT_CODE = 101
ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102

# wait for timeout, unit: seconds
ELASTIC_TIMEOUT = 2 * 60
Expand Down Expand Up @@ -103,6 +104,9 @@ def _check_procs(self):
if ret is None:
alive = True
elif ret != 0:
if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE:
logger.info("return form elastic auto parallel re-launch")
return ret
logger.error("ABORT!!! ABORT!!! ABORT!!!")
logger.error(
"ERROR rank {} error with exit code {}, check log for detail.".
Expand Down Expand Up @@ -232,6 +236,7 @@ def host_call_back(event):
six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix)
]
self.hosts = list(set(self.hosts)) if self.hosts else self.hosts
logger.info(
f"host_call_back curr_host={self.curr_host}, hosts:{self.hosts}")
self.need_sync = True
Expand All @@ -251,6 +256,7 @@ def lease_heartbeat():
six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix)
]
hosts = list(set(hosts)) if hosts else hosts
logger.info(
f"[lease_heartbeat] curr_host={self.curr_host}, hosts={hosts}"
)
Expand Down Expand Up @@ -335,6 +341,7 @@ def pre_hook(self):
if not self.args.elastic_pre_hook:
logger.info("skip pre_hook")
return
logger.info("execute pre_hook...")
current_env = copy.copy(os.environ.copy())
out, err = subprocess.Popen(
self.args.elastic_pre_hook,
Expand Down Expand Up @@ -391,6 +398,7 @@ def _match(self, host_list: list=None):
six.ensure_str(i[0])
for i in self.etcd.get_prefix(self.node_prefix)
]
self.hosts = list(set(self.hosts)) if self.hosts else self.hosts

if self.elastic_level == ElasticLevel.FAULT_TOLERANCE:
if len(self.hosts) == self.np:
Expand Down Expand Up @@ -430,6 +438,9 @@ def _update_endpoint(self, endpoints, hosts):

def _update_fault_tolrance(self):
rank = int(os.getenv('PADDLE_TRAINER_ID', -1))
logger.debug(
f"self.curr_host={self.curr_host}, self.dist_endpoints={self.dist_endpoints}"
)
if self.curr_host in self.dist_endpoints:
os.environ['DISTRIBUTED_TRAINER_ENDPOINTS'] = self.dist_endpoints
os.environ['PADDLE_TRAINERS'] = self.trainers
Expand Down Expand Up @@ -550,7 +561,6 @@ def wait(self):
self.hosts))
idx += 1
time.sleep(2)

return

def run(self, launcher):
Expand All @@ -571,6 +581,11 @@ def watch(self):

if ret is not None: # self terminated
logger.info('job exit with code {}'.format(ret))
if ret == ELASTIC_AUTO_PARALLEL_EXIT_CODE:
logger.info('job re-launch for auto parallel')
self.launcher.stop()
return ElasticStatus.HOLD

# process is completed if ret >= 0 or error else
completed = True if ret == 0 else False
self.exit(completed=completed)
Expand Down
20 changes: 15 additions & 5 deletions python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import time
import six
import copy
import pathlib
import argparse
from argparse import ArgumentParser, REMAINDER
import paddle
Expand Down Expand Up @@ -283,7 +284,7 @@ def cpuonly_check(args):
return True


def launch_collective(args):
def get_cluster_info(args):
# parse arguments, used for cloud-single-machine and local
if args.backend == 'gloo': cpuonly_check(args)
(device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args)
Expand Down Expand Up @@ -316,14 +317,23 @@ def launch_collective(args):
cluster, pod = get_cluster_from_args(args, device_mode,
devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))
return cluster, pod


def get_global_envs(args, tmp_dir):
global_envs = copy.copy(os.environ.copy())
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
global_envs["PADDLE_GLOO_FS_PATH"] = tmp_dir
global_envs["PADDLE_DISTRI_BACKEND"] = args.backend
return global_envs


def launch_collective(args):
tmp_dir = tempfile.mkdtemp()
cluster, pod = get_cluster_info(args)
global_envs = get_global_envs(args, tmp_dir)

procs = start_local_trainers(
cluster,
Expand Down Expand Up @@ -352,8 +362,8 @@ def launch_collective(args):
terminate_local_procs(procs)
exit(1)

if os.path.exists(gloo_rendezvous_dir):
shutil.rmtree(gloo_rendezvous_dir)
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)


def launch_ps(args, distribute_mode):
Expand Down
2 changes: 0 additions & 2 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ascend)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_rank_mapping)
list(APPEND MIXED_DIST_TEST_OPS test_ascend_group)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input)
Expand Down Expand Up @@ -665,7 +664,6 @@ if(WITH_DISTRIBUTE)
bash_test_modules(test_fleet_launch_async START_BASH test_fleet_launch_async.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch_rank_mapping START_BASH test_fleet_launch_rank_mapping.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
if(WITH_ASCEND OR WITH_ASCEND_CL)
bash_test_modules(test_fleet_launch_ascend START_BASH test_fleet_launch_ascend.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_ascend_group START_BASH test_ascend_group.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
Expand Down
111 changes: 111 additions & 0 deletions python/paddle/fluid/tests/unittests/test_fleet_elastic_collective.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import os
import time
import json
import unittest
import argparse
import tempfile
import traceback
from warnings import catch_warnings

from paddle.distributed.fleet.elastic.collective import CollectiveLauncher
from paddle.distributed.fleet.launch import launch_collective

fake_python_code = """
print("test")
"""


class TestCollectiveLauncher(unittest.TestCase):
def setUp(self):
file_dir = os.path.dirname(os.path.abspath(__file__))

self.code_path = os.path.join(file_dir, "fake_python_for_elastic.py")
with open(self.code_path, "w") as f:
f.write(fake_python_code)

def test_launch(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "1"
gpus = "0"
nproc_per_node = 1
host = None
curr_host = None
ips = "127.0.0.1"
scale = None
force = None
backend = 'gloo'
enable_auto_mapping = False
run_mode = "cpuonly"
servers = None
rank_mapping_path = None
training_script = "fake_python_for_elastic.py"
training_script_args = ["--use_amp false"]
log_dir = None

args = Argument()

launch = CollectiveLauncher(args)

try:
args.backend = "gloo"
launch.launch()
launch.stop()
except Exception as e:
pass

try:
args.backend = "gloo"
launch_collective(args)
except Exception as e:
pass

def test_stop(self):
class Argument:
elastic_server = "127.0.0.1:2379"
job_id = "test_job_id_123"
np = "1"
gpus = "0"
nproc_per_node = 1
host = None
curr_host = None
ips = "127.0.0.1"
scale = None
force = None
backend = 'gloo'
enable_auto_mapping = False
run_mode = "cpuonly"
servers = None
rank_mapping_path = None
training_script = "fake_python_for_elastic.py"
training_script_args = ["--use_amp false"]
log_dir = None

args = Argument()
try:
launch = CollectiveLauncher(args)
launch.tmp_dir = tempfile.mkdtemp()
launch.stop()
except Exception as e:
pass


if __name__ == "__main__":
unittest.main()
Loading