Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fleetrun launch in legacy mode #40568

Merged
merged 9 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,74 +12,69 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .job.container import Container
from .job.pod import Pod
from .job.job import Job
from . import plugins

#__all__ = [Container, Pod, Job]
__all__ = []
'''
Paddle distribution training entry ``python -m paddle.distributed.run``.
Paddle distributed training entry ``python -m paddle.distributed.launch``.

Help

# for arg usage and explanation, try the following command
# python -m paddle.distributed.run -h
# python -m paddle.distributed.launch -h

Collective Mode

Case 1: 1 node

use all visible devices
# python -m paddle.distributed.run train.py
# python -m paddle.distributed.launch train.py

use specified devices
# python -m paddle.distributed.run --devices=0,1,2,3 train.py
# python -m paddle.distributed.launch --devices=0,1,2,3 train.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果是在平台上通过环境变量配置的,是对应新版本的逻辑还是对应老版本的逻辑,可以在check一下是否有兼容的问题

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新版本统一 --devices, 否则使用老版本


Case 2: multi-node, auto detect ip/port

# python -m paddle.distributed.run --np 2 train.py
# python -m paddle.distributed.launch --nnodes 2 train.py
# auto print following command
# python -m paddle.distributed.run --master 10.0.0.1:13538 --np 2 demo.py
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --nnodes 2 demo.py
# then copy and paste above command to other nodes

Case 3: multi-node, specified master/rendezvous server

# python -m paddle.distributed.run --np 2 --master 10.0.0.1:2379 train.py
# python -m paddle.distributed.launch --nnodes 2 --master 10.0.0.1:2379 train.py
# the master ip must be one of the node and the port must available

Parameter Server Mode

Case 1.1: 1 node, 1 ps, 1 trainer

# python -m paddle.distributed.run --mode ps train.py
# python -m paddle.distributed.run --server_num=1 --trainer_num=1 train.py
# python -m paddle.distributed.launch --mode ps train.py
# python -m paddle.distributed.launch --server_num=1 --trainer_num=1 train.py

Case 1.2: 1 node, 2 ps, 2 trainer

# python -m paddle.distributed.run --server_num=2 --trainer_num=2 train.py
# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 train.py

Case 2: 2 node, 2 ps, 2 trainer per node

# python -m paddle.distributed.run --server_num=2 --trainer_num=2 --np 2 train.py
# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 --nnodes 2 train.py
# auto print following command
# python -m paddle.distributed.run --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --np 2 train.py
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py
# then copy and paste above command to other nodes

Case 3: multi-node, specified master/rendezvous server

# python -m paddle.distributed.run --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --np 2 train.py
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py
# the master ip must be one of the node and the port must available

Case 4: specified servers and trainers in each node

python -m paddle.distributed.run --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903 train.py
python -m paddle.distributed.launch --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903 train.py


Elastic Mode

# run following command in 3 node to run immediately, or in 2 node to run after elastic_timeout
# python -m paddle.distributed.run --master etcd://10.0.0.1:2379 --np 2:3 train.py
# python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2:3 train.py

# once the peer number changes between 2:3, the strategy holds

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,28 @@
from .context import Context
from . import controllers

# initialize the context to run
ctx = Context()

# initialize the selected controller
c = controllers.init(ctx)
def launch():
# initialize the context to run
ctx = Context()

# run the pods
c.run()
if ctx.is_legacy_mode():

# manager or just wait pod
c.finalize()
# legacy mode
from paddle.distributed.fleet import launch
launch.launch()

else:

# initialize the selected controller
c = controllers.init(ctx)

# run the pods
c.run()

# manager or just wait pod
c.finalize()


if __name__ == "__main__":
launch()
88 changes: 88 additions & 0 deletions python/paddle/distributed/launch/context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from paddle.distributed.launch import plugins

from .node import Node
from .status import Status
from .args_envs import parse_args, fetch_envs, env_args_mapping

import logging


class Context(object):
def __init__(self, enable_plugin=True):
self.args, self.unknown_args = parse_args()
self.envs = fetch_envs()
self.logger = self.get_logger()

self.node = Node()
self.status = Status()

self.set_env_in_args()

# design for event queue, later
self.events = []

if enable_plugin:
self._enable_plugin()

def is_legacy_mode(self):
if self.args.legacy:
return True

if len(self.unknown_args) > 0:
self.logger.warning("Compatible mode enable with args {}".format(
self.unknown_args))
return True

legacy_env_list = [
'DISTRIBUTED_TRAINER_ENDPOINTS',
'PADDLE_ELASTIC_JOB_ID',
'PADDLE_DISTRI_BACKEND',
'FLAGS_START_PORT',
]

for env in legacy_env_list:
if env in self.envs:
self.logger.warning(
"ENV {} is deprecated, legacy launch enable".format(env))
return True

if self.args.master:
return False

return False

def get_envs(self):
return self.envs.copy()

def _enable_plugin(self):
for pl in plugins.enabled_plugins:
pl(self)

def get_logger(self, level=logging.INFO):
logger = logging.getLogger("LAUNCH")
logger.setLevel(self.args.log_level.upper() or level)
formatter = logging.Formatter(
fmt='%(name)s %(levelname)s %(asctime)s %(message)s')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger

def set_env_in_args(self):
for k, v in env_args_mapping.items():
if k in self.envs:
setattr(self.args, v, self.envs[k])
151 changes: 151 additions & 0 deletions python/paddle/distributed/launch/context/args_envs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from argparse import ArgumentParser, REMAINDER

env_args_mapping = {
'POD_IP': 'host',
'PADDLE_MASTER': 'master',
'PADDLE_DEVICES': 'devices',
'PADDLE_NNODES': 'nnodes',
'PADDLE_MODE': 'mode',
'PADDLE_LOG_LEVEL': 'log_level',
'PADDLE_NPROC_PER_NODE': 'nproc_per_node',
'PADDLE_JOB_ID': 'job_id',
'PADDLE_RANK': 'rank',
'PADDLE_LOG_DIR': 'log_dir',
'PADDLE_MAX_RESTART': 'max_restart',
'PADDLE_ELASTIC_LEVEL': 'elastic_level',
'PADDLE_ELASTIC_TIMEOUT': 'elastic_timeout',
'PADDLE_SERVER_NUM': 'server_num',
'PADDLE_TRAINER_NUM': 'trainer_num',
'PADDLE_SERVERS_ENDPOINTS': 'servers',
'PADDLE_TRAINERS_ENDPOINTS': 'trainers',
'PADDLE_GLOO_PORT': 'gloo_port',
'PADDLE_WITH_GLOO': 'with_gloo',
}


def fetch_envs():
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)

return os.environ.copy()


def parse_args():
parser = ArgumentParser()

base_group = parser.add_argument_group("Base Parameters")

base_group.add_argument(
"--master",
type=str,
default=None,
help="the master/rendezvous server, ip:port")

base_group.add_argument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新增的参数legacy建议在增加一些解释

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可以说明一下legacy只用于内部调试,外部开发者不需要关心这两种模式的差别。

"--legacy", type=bool, default=False, help="use legacy launch")

base_group.add_argument(
"--rank", type=int, default=-1, help="the peer rank")

base_group.add_argument(
"--log_level", type=str, default="INFO", help="log level. Default INFO")

base_group.add_argument(
"--nnodes",
type=str,
default="1",
help="the number of peers, i.e. pod/node number")

base_group.add_argument(
"--nproc_per_node",
type=int,
default=None,
help="the number of processes in a pod")

base_group.add_argument(
"--log_dir",
type=str,
default="log",
help="the path for each process's log. Default ./log")
base_group.add_argument(
"--mode",
type=str,
default="collective",
help="run mode of the job, collective/ps/ps-heter")

base_group.add_argument(
"--job_id",
type=str,
default="default",
help="unique id of the job. Default default")

base_group.add_argument(
"--devices",
type=str,
default=None,
help="accelerate devices. as --gpus,npus,xps")

base_group.add_argument("--host", type=str, default=None, help="host ip")

base_group.add_argument(
"training_script",
type=str,
help="the full path of py script,"
"followed by arguments for the "
"training script")

base_group.add_argument('training_script_args', nargs=REMAINDER)

ps_group = parser.add_argument_group("Parameter-Server Parameters")
# for parameter server
ps_group.add_argument(
"--servers", type=str, default='', help="servers endpoints full list")
ps_group.add_argument(
"--trainers", type=str, default='', help="trainers endpoints full list")

ps_group.add_argument(
"--trainer_num", type=int, default=None, help="number of trainers")
ps_group.add_argument(
"--server_num", type=int, default=None, help="number of servers")
ps_group.add_argument(
"--gloo_port", type=int, default=6767, help="gloo http port")
ps_group.add_argument(
"--with_gloo", type=str, default="0", help="use gloo or not")

# parameter elastic mode
elastic_group = parser.add_argument_group("Elastic Parameters")
elastic_group.add_argument(
"--max_restart",
type=int,
default=3,
help="the times can restart. Default 3")

elastic_group.add_argument(
"--elastic_level",
type=int,
default=-1,
help="elastic level: -1 disable, 0 failed exit, peers hold, 1 internal restart"
)

elastic_group.add_argument(
"--elastic_timeout",
type=int,
default=30,
help="seconds to wait before elastic perform training")

return parser.parse_known_args()
Loading