diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 43afd034e0..a3a2c0e647 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -11,6 +11,10 @@ in development Added ~~~~~ +* Added service degerestration on shutdown of a service. #5396 + + Contributed by @khushboobhatia01 + * Added possibility to add new values to the KV store via CLI without leaking them to the shell history. #5164 * ``st2.conf`` is now the only place to configure ports for ``st2api``, ``st2auth``, and ``st2stream``. diff --git a/st2actions/st2actions/cmd/actionrunner.py b/st2actions/st2actions/cmd/actionrunner.py index 6aa339115a..76743ab707 100644 --- a/st2actions/st2actions/cmd/actionrunner.py +++ b/st2actions/st2actions/cmd/actionrunner.py @@ -30,10 +30,12 @@ from st2common import log as logging from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service __all__ = ["main"] LOG = logging.getLogger(__name__) +ACTIONRUNNER = "actionrunner" def _setup_sigterm_handler(): @@ -49,7 +51,7 @@ def sigterm_handler(signum=None, frame=None): def _setup(): capabilities = {"name": "actionrunner", "type": "passive"} common_setup( - service="actionrunner", + service=ACTIONRUNNER, config=config, setup_db=True, register_mq_exchanges=True, @@ -75,6 +77,7 @@ def _run_worker(): errors = False try: + deregister_service(service=ACTIONRUNNER) action_worker.shutdown() except: LOG.exception("Unable to shutdown worker.") diff --git a/st2actions/st2actions/cmd/scheduler.py b/st2actions/st2actions/cmd/scheduler.py index b3cd3badc4..465d067f31 100644 --- a/st2actions/st2actions/cmd/scheduler.py +++ b/st2actions/st2actions/cmd/scheduler.py @@ -28,10 +28,12 @@ from st2common import log as logging from st2common.service_setup import teardown as common_teardown from st2common.service_setup import setup as common_setup +from st2common.service_setup import deregister_service __all__ = ["main"] LOG = logging.getLogger(__name__) +SCHEDULER = "scheduler" def _setup_sigterm_handler(): @@ -47,7 +49,7 @@ def sigterm_handler(signum=None, frame=None): def _setup(): capabilities = {"name": "scheduler", "type": "passive"} common_setup( - service="scheduler", + service=SCHEDULER, config=config, setup_db=True, register_mq_exchanges=True, @@ -101,6 +103,7 @@ def _run_scheduler(): errors = False try: + deregister_service(service=SCHEDULER) handler.shutdown() entrypoint.shutdown() except: diff --git a/st2actions/st2actions/cmd/st2notifier.py b/st2actions/st2actions/cmd/st2notifier.py index 7f1ccc7222..2ed1be9cec 100644 --- a/st2actions/st2actions/cmd/st2notifier.py +++ b/st2actions/st2actions/cmd/st2notifier.py @@ -25,18 +25,20 @@ from st2common import log as logging from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2actions.notifier import config from st2actions.notifier import notifier __all__ = ["main"] LOG = logging.getLogger(__name__) +NOTIFIER = "notifier" def _setup(): capabilities = {"name": "notifier", "type": "passive"} common_setup( - service="notifier", + service=NOTIFIER, config=config, setup_db=True, register_mq_exchanges=True, @@ -53,6 +55,7 @@ def _run_worker(): actions_notifier.start(wait=True) except (KeyboardInterrupt, SystemExit): LOG.info("(PID=%s) Actions notifier stopped.", os.getpid()) + deregister_service(service=NOTIFIER) actions_notifier.shutdown() return 0 diff --git a/st2actions/st2actions/cmd/workflow_engine.py b/st2actions/st2actions/cmd/workflow_engine.py index f51296b4b0..dba392f100 100644 --- a/st2actions/st2actions/cmd/workflow_engine.py +++ b/st2actions/st2actions/cmd/workflow_engine.py @@ -32,10 +32,12 @@ from st2common import log as logging from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service __all__ = ["main"] LOG = logging.getLogger(__name__) +WORKFLOW_ENGINE = "workflow_engine" def setup_sigterm_handler(): @@ -51,7 +53,7 @@ def sigterm_handler(signum=None, frame=None): def setup(): capabilities = {"name": "workflowengine", "type": "passive"} common_setup( - service="workflow_engine", + service=WORKFLOW_ENGINE, config=config, setup_db=True, register_mq_exchanges=True, @@ -72,6 +74,7 @@ def run_server(): engine.start(wait=True) except (KeyboardInterrupt, SystemExit): LOG.info("(PID=%s) Workflow engine stopped.", os.getpid()) + deregister_service(service=WORKFLOW_ENGINE) engine.shutdown() except: LOG.exception("(PID=%s) Workflow engine unexpectedly stopped.", os.getpid()) diff --git a/st2api/st2api/cmd/api.py b/st2api/st2api/cmd/api.py index e8cc71e875..12c903f088 100644 --- a/st2api/st2api/cmd/api.py +++ b/st2api/st2api/cmd/api.py @@ -31,6 +31,7 @@ from st2common import log as logging from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2api import config config.register_opts(ignore_errors=True) @@ -41,6 +42,7 @@ __all__ = ["main"] LOG = logging.getLogger(__name__) +API = "api" # How much time to give to the request in progress to finish in seconds before killing them WSGI_SERVER_REQUEST_SHUTDOWN_TIME = 2 @@ -55,7 +57,7 @@ def _setup(): } common_setup( - service="api", + service=API, config=config, setup_db=True, register_mq_exchanges=True, @@ -94,6 +96,7 @@ def main(): _setup() return _run_server() except SystemExit as exit_code: + deregister_service(API) sys.exit(exit_code) except Exception: LOG.exception("(PID=%s) ST2 API quit due to exception.", os.getpid()) diff --git a/st2auth/st2auth/cmd/api.py b/st2auth/st2auth/cmd/api.py index e10b774872..e817765d97 100644 --- a/st2auth/st2auth/cmd/api.py +++ b/st2auth/st2auth/cmd/api.py @@ -27,6 +27,7 @@ from st2common import log as logging from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2auth import config config.register_opts(ignore_errors=True) @@ -39,6 +40,7 @@ LOG = logging.getLogger(__name__) +AUTH = "auth" def _setup(): @@ -50,7 +52,7 @@ def _setup(): "type": "active", } common_setup( - service="auth", + service=AUTH, config=config, setup_db=True, register_mq_exchanges=False, @@ -108,6 +110,7 @@ def main(): _setup() return _run_server() except SystemExit as exit_code: + deregister_service(AUTH) sys.exit(exit_code) except Exception: LOG.exception("(PID=%s) ST2 Auth API quit due to exception.", os.getpid()) diff --git a/st2common/st2common/service_setup.py b/st2common/st2common/service_setup.py index 2c87c8fcaa..9e8ae7e07f 100644 --- a/st2common/st2common/service_setup.py +++ b/st2common/st2common/service_setup.py @@ -29,6 +29,8 @@ import eventlet.debug from oslo_config import cfg from tooz.coordination import GroupAlreadyExist +from tooz.coordination import GroupNotCreated +from tooz.coordination import MemberNotJoined from st2common import log as logging from st2common.constants.logging import DEFAULT_LOGGING_CONF_PATH @@ -62,6 +64,7 @@ "db_setup", "db_teardown", "register_service_in_service_registry", + "deregister_service", ] # Message which is logged if non utf-8 locale is detected on startup. @@ -339,3 +342,22 @@ def register_service_in_service_registry(service, capabilities=None, start_heart % (group_id, member_id, capabilities) ) return coordinator.join_group(group_id, capabilities=capabilities).get() + + +def deregister_service(service, start_heart=True): + + if not isinstance(service, six.binary_type): + group_id = service.encode("utf-8") + else: + group_id = service + + coordinator = coordination.get_coordinator(start_heart=start_heart) + + member_id = coordination.get_member_id() + LOG.debug( + 'Leaving service registry group "%s" as member_id "%s"' % (group_id, member_id) + ) + try: + coordinator.leave_group(group_id).get() + except (GroupNotCreated, MemberNotJoined): + pass diff --git a/st2common/st2common/services/coordination.py b/st2common/st2common/services/coordination.py index fadfd2768f..15045d0eff 100644 --- a/st2common/st2common/services/coordination.py +++ b/st2common/st2common/services/coordination.py @@ -21,6 +21,7 @@ from tooz import coordination from tooz import locking from tooz.coordination import GroupNotCreated +from tooz.coordination import MemberNotJoined from st2common import log as logging from st2common.util import system_info @@ -124,8 +125,15 @@ def join_group(cls, group_id, capabilities=""): @classmethod def leave_group(cls, group_id): member_id = get_member_id() + try: + members = cls.groups[group_id]["members"] + except KeyError: + raise GroupNotCreated(group_id) - del cls.groups[group_id]["members"][member_id] + try: + del members[member_id] + except KeyError: + raise MemberNotJoined(group_id, member_id) return NoOpAsyncResult() @classmethod diff --git a/st2common/tests/unit/test_service_setup.py b/st2common/tests/unit/test_service_setup.py index a2c00d120d..bb563c1c69 100644 --- a/st2common/tests/unit/test_service_setup.py +++ b/st2common/tests/unit/test_service_setup.py @@ -23,6 +23,7 @@ from oslo_config.cfg import ConfigFilesNotFoundError from st2common import service_setup +from st2common.services import coordination from st2common.transport.bootstrap_utils import register_exchanges from st2common.transport.bootstrap_utils import QUEUES @@ -216,3 +217,23 @@ def mock_get_logging_config_path(): run_migrations=False, register_runners=False, ) + + def test_deregister_service_when_service_registry_enabled(self): + service = "api" + service_setup.register_service_in_service_registry( + service, capabilities={"hostname": "", "pid": ""} + ) + coordinator = coordination.get_coordinator(start_heart=True) + members = coordinator.get_members(service.encode("utf-8")) + self.assertEqual(len(list(members.get())), 1) + service_setup.deregister_service(service) + self.assertEqual(len(list(members.get())), 0) + + def test_deregister_service_when_service_registry_disables(self): + service = "api" + try: + service_setup.deregister_service(service) + except: + assert False, "service_setup.deregister_service raised exception" + + assert True diff --git a/st2reactor/st2reactor/cmd/garbagecollector.py b/st2reactor/st2reactor/cmd/garbagecollector.py index b4be4dfa8b..4d25fa4c44 100644 --- a/st2reactor/st2reactor/cmd/garbagecollector.py +++ b/st2reactor/st2reactor/cmd/garbagecollector.py @@ -28,6 +28,7 @@ from st2common.logging.misc import get_logger_name_for_module from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2common.constants.exit_codes import FAILURE_EXIT_CODE from st2reactor.garbage_collector import config from st2reactor.garbage_collector.base import GarbageCollectorService @@ -37,12 +38,13 @@ LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__]) LOG = logging.getLogger(LOGGER_NAME) +GARBAGE_COLLECTOR = "garbagecollector" def _setup(): capabilities = {"name": "garbagecollector", "type": "passive"} common_setup( - service="garbagecollector", + service=GARBAGE_COLLECTOR, config=config, setup_db=True, register_mq_exchanges=True, @@ -68,6 +70,7 @@ def main(): ) exit_code = garbage_collector.run() except SystemExit as exit_code: + deregister_service(GARBAGE_COLLECTOR) return exit_code except: LOG.exception("(PID:%s) GarbageCollector quit due to exception.", os.getpid()) diff --git a/st2reactor/st2reactor/cmd/rulesengine.py b/st2reactor/st2reactor/cmd/rulesengine.py index 895fbe42d9..3629345324 100644 --- a/st2reactor/st2reactor/cmd/rulesengine.py +++ b/st2reactor/st2reactor/cmd/rulesengine.py @@ -26,18 +26,20 @@ from st2common.logging.misc import get_logger_name_for_module from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2reactor.rules import config from st2reactor.rules import worker LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__]) LOG = logging.getLogger(LOGGER_NAME) +RULESENGINE = "rulesengine" def _setup(): capabilities = {"name": "rulesengine", "type": "passive"} common_setup( - service="rulesengine", + service=RULESENGINE, config=config, setup_db=True, register_mq_exchanges=True, @@ -63,6 +65,7 @@ def _run_worker(): return rules_engine_worker.wait() except (KeyboardInterrupt, SystemExit): LOG.info("(PID=%s) RulesEngine stopped.", os.getpid()) + deregister_service(RULESENGINE) rules_engine_worker.shutdown() except: LOG.exception("(PID:%s) RulesEngine quit due to exception.", os.getpid()) diff --git a/st2reactor/st2reactor/cmd/sensormanager.py b/st2reactor/st2reactor/cmd/sensormanager.py index f3d27afb5b..51abcaa6ac 100644 --- a/st2reactor/st2reactor/cmd/sensormanager.py +++ b/st2reactor/st2reactor/cmd/sensormanager.py @@ -28,6 +28,7 @@ from st2common.logging.misc import get_logger_name_for_module from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2common.exceptions.sensors import SensorNotFoundException from st2common.constants.exit_codes import FAILURE_EXIT_CODE from st2reactor.sensor import config @@ -39,12 +40,13 @@ LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__]) LOG = logging.getLogger(LOGGER_NAME) +SENSOR_CONTAINER = "sensorcontainer" def _setup(): capabilities = {"name": "sensorcontainer", "type": "passive"} common_setup( - service="sensorcontainer", + service=SENSOR_CONTAINER, config=config, setup_db=True, register_mq_exchanges=True, @@ -80,6 +82,7 @@ def main(): ) return container_manager.run_sensors() except SystemExit as exit_code: + deregister_service(SENSOR_CONTAINER) return exit_code except SensorNotFoundException as e: LOG.exception(e) diff --git a/st2reactor/st2reactor/cmd/timersengine.py b/st2reactor/st2reactor/cmd/timersengine.py index 9b4edd52b5..49f91f949c 100644 --- a/st2reactor/st2reactor/cmd/timersengine.py +++ b/st2reactor/st2reactor/cmd/timersengine.py @@ -30,18 +30,20 @@ from st2common.logging.misc import get_logger_name_for_module from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2reactor.timer import config from st2reactor.timer.base import St2Timer LOGGER_NAME = get_logger_name_for_module(sys.modules[__name__]) LOG = logging.getLogger(LOGGER_NAME) +TIMER_ENGINE = "timer_engine" def _setup(): capabilities = {"name": "timerengine", "type": "passive"} common_setup( - service="timer_engine", + service=TIMER_ENGINE, config=config, setup_db=True, register_mq_exchanges=True, @@ -78,6 +80,7 @@ def _run_worker(): LOG.info(TIMER_DISABLED_LOG_LINE) except (KeyboardInterrupt, SystemExit): LOG.info("(PID=%s) TimerEngine stopped.", os.getpid()) + deregister_service(TIMER_ENGINE) except: LOG.exception("(PID:%s) TimerEngine quit due to exception.", os.getpid()) return 1 diff --git a/st2stream/st2stream/cmd/api.py b/st2stream/st2stream/cmd/api.py index 35f0c5b3bd..3de0c89f03 100644 --- a/st2stream/st2stream/cmd/api.py +++ b/st2stream/st2stream/cmd/api.py @@ -27,6 +27,7 @@ from st2common import log as logging from st2common.service_setup import setup as common_setup from st2common.service_setup import teardown as common_teardown +from st2common.service_setup import deregister_service from st2common.stream.listener import get_listener_if_set from st2common.util.wsgi import shutdown_server_kill_pending_requests from st2stream.signal_handlers import register_stream_signal_handlers @@ -48,6 +49,7 @@ ) LOG = logging.getLogger(__name__) +STREAM = "stream" # How much time to give to the request in progress to finish in seconds before killing them WSGI_SERVER_REQUEST_SHUTDOWN_TIME = 2 @@ -61,7 +63,7 @@ def _setup(): "type": "active", } common_setup( - service="stream", + service=STREAM, config=config, setup_db=True, register_mq_exchanges=True, @@ -86,6 +88,7 @@ def _run_server(): sock = eventlet.listen((host, port)) def queue_shutdown(signal_number, stack_frame): + deregister_service(STREAM) eventlet.spawn_n( shutdown_server_kill_pending_requests, sock=sock, @@ -111,8 +114,10 @@ def main(): _setup() return _run_server() except SystemExit as exit_code: + deregister_service(STREAM) sys.exit(exit_code) except KeyboardInterrupt: + deregister_service(STREAM) listener = get_listener_if_set(name="stream") if listener: