diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7d2fd46715..766ebcb3f3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -11,6 +11,10 @@ Changed * Update requests library to latest version (2.22.0) in requirements. (improvement) #4680 * Disallow "decrypt_kv" filter to be specified in the config for values that are marked as "secret: True" in the schema. (improvement) #4709 +* Upgrade ``tooz`` library to latest stable version (1.65.0) so it uses latest version of + ``grpcio`` library. (improvement) #4713 +* Update ``st2-pack-install`` and ``st2-pack-download`` CLI command so it supports installing + packs from local directories which are not git repositories. (improvement) #4713 Fixed ~~~~~ @@ -19,6 +23,14 @@ Fixed * Allow tasks defined in the same task transition with ``fail`` to run for orquesta. (bug fix) * Fix workflow service to handle unexpected coordinator and database errors. (bug fix) #4704 #4705 * Fix filter ``to_yaml_string`` to handle mongoengine base types for dict and list. (bug fix) #4700 +* Fix timeout handling in the Python runner. In some scenarios where action would time out before + producing any output (stdout, stder), timeout was not correctly propagated to the user. (bug fix) + #4713 +* Update ``st2common/setup.py`` file so it correctly declares all the dependencies and script + files it provides. This way ``st2-pack-*`` commands can be used in a standalone fashion just by + installing ``st2common`` Python package and nothing else. (bug fix) #4713 +* Fix ``st2-pack-download`` command so it works in the environments where ``sudo`` binary is not + available (e.g. Docker). (bug fix) #4713 3.0.1 - May 24, 2019 -------------------- diff --git a/contrib/packs/tests/fixtures/stackstorm-test4/pack.yaml b/contrib/packs/tests/fixtures/stackstorm-test4/pack.yaml new file mode 100644 index 0000000000..aa946e815f --- /dev/null +++ b/contrib/packs/tests/fixtures/stackstorm-test4/pack.yaml @@ -0,0 +1,11 @@ +--- +name : test4 +description : st2 pack to test package management pipeline +keywords: + - some + - search + - another + - terms +version : "0.4.0" +author : st2-dev +email : info@stackstorm.com diff --git a/contrib/packs/tests/test_action_download.py b/contrib/packs/tests/test_action_download.py index a2171b23eb..dc31c492e7 100644 --- a/contrib/packs/tests/test_action_download.py +++ b/contrib/packs/tests/test_action_download.py @@ -35,6 +35,8 @@ from pack_mgmt.download import DownloadGitRepoAction +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + PACK_INDEX = { "test": { "version": "0.4.0", @@ -67,6 +69,18 @@ } +original_is_dir_func = os.path.isdir + + +def mock_is_dir_func(path): + """ + Mock function which returns True if path ends with .git + """ + if path.endswith('.git'): + return True + return original_is_dir_func(path) + + @mock.patch.object(pack_service, 'fetch_pack_index', mock.MagicMock(return_value=(PACK_INDEX, {}))) class DownloadGitRepoActionTestCase(BaseActionTestCase): action_cls = DownloadGitRepoAction @@ -79,8 +93,9 @@ def setUp(self): self.addCleanup(clone_from.stop) self.clone_from = clone_from.start() + self.expand_user_path = tempfile.mkdtemp() expand_user = mock.patch.object(os.path, 'expanduser', - mock.MagicMock(return_value=tempfile.mkdtemp())) + mock.MagicMock(return_value=self.expand_user_path)) self.addCleanup(expand_user.stop) self.expand_user = expand_user.start() @@ -522,16 +537,38 @@ def fake_commit(arg_ref): result = action.run(packs=packs, abs_repo_base=self.repo_base) self.assertEqual(result, {'test': 'Success.'}) + @mock.patch('os.path.isdir', mock_is_dir_func) def test_run_pack_dowload_local_git_repo_detached_head_state(self): action = self.get_action_instance() type(self.repo_instance).active_branch = \ mock.PropertyMock(side_effect=TypeError('detached head')) - result = action.run(packs=['file:///stackstorm-test'], abs_repo_base=self.repo_base) + pack_path = os.path.join(BASE_DIR, 'fixtures/stackstorm-test') + + result = action.run(packs=['file://%s' % (pack_path)], abs_repo_base=self.repo_base) self.assertEqual(result, {'test': 'Success.'}) # Verify function has bailed out early self.repo_instance.git.checkout.assert_not_called() self.repo_instance.git.branch.assert_not_called() self.repo_instance.git.checkout.assert_not_called() + + def test_run_pack_download_local_directory(self): + action = self.get_action_instance() + + # 1. Local directory doesn't exist + expected_msg = r'Local pack directory ".*" doesn\'t exist' + self.assertRaisesRegexp(ValueError, expected_msg, action.run, + packs=['file://doesnt_exist'], abs_repo_base=self.repo_base) + + # 2. Local pack which is not a git repository + pack_path = os.path.join(BASE_DIR, 'fixtures/stackstorm-test4') + + result = action.run(packs=['file://%s' % (pack_path)], abs_repo_base=self.repo_base) + self.assertEqual(result, {'test4': 'Success.'}) + + # Verify pack contents have been copied over + destination_path = os.path.join(self.repo_base, 'test4') + self.assertTrue(os.path.exists(destination_path)) + self.assertTrue(os.path.exists(os.path.join(destination_path, 'pack.yaml'))) diff --git a/contrib/runners/local_runner/local_runner/base.py b/contrib/runners/local_runner/local_runner/base.py index e2bf5c8035..2d66df0df2 100644 --- a/contrib/runners/local_runner/local_runner/base.py +++ b/contrib/runners/local_runner/local_runner/base.py @@ -22,7 +22,6 @@ import six from oslo_config import cfg -from eventlet.green import subprocess from six.moves import StringIO from st2common.constants import action as action_constants @@ -35,6 +34,7 @@ from st2common.util.green import shell from st2common.util.shell import kill_process from st2common.util import jsonify +from st2common.util import concurrency from st2common.services.action import store_execution_output_data from st2common.runners.utils import make_read_and_store_stream_func @@ -138,13 +138,15 @@ def _run(self, action): read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution, action_db=self.action, store_data_func=store_execution_stderr_line) + subprocess = concurrency.get_subprocess_module() + # If sudo password is provided, pass it to the subprocess via stdin> # Note: We don't need to explicitly escape the argument because we pass command as a list # to subprocess.Popen and all the arguments are escaped by the function. if self._sudo_password: LOG.debug('Supplying sudo password via stdin') - echo_process = subprocess.Popen(['echo', self._sudo_password + '\n'], - stdout=subprocess.PIPE) + echo_process = concurrency.subprocess_popen(['echo', self._sudo_password + '\n'], + stdout=subprocess.PIPE) stdin = echo_process.stdout else: stdin = None diff --git a/contrib/runners/local_runner/tests/integration/test_localrunner.py b/contrib/runners/local_runner/tests/integration/test_localrunner.py index cdcbcfcdd3..c62d47e58d 100644 --- a/contrib/runners/local_runner/tests/integration/test_localrunner.py +++ b/contrib/runners/local_runner/tests/integration/test_localrunner.py @@ -152,8 +152,8 @@ def test_sudo_and_env_variable_preservation(self): self.assertEquals(status, action_constants.LIVEACTION_STATUS_SUCCEEDED) self.assertEqual(result['stdout'].strip(), 'root\nponiesponies') - @mock.patch('st2common.util.green.shell.subprocess.Popen') - @mock.patch('st2common.util.green.shell.eventlet.spawn') + @mock.patch('st2common.util.concurrency.subprocess_popen') + @mock.patch('st2common.util.concurrency.spawn') def test_action_stdout_and_stderr_is_stored_in_the_db(self, mock_spawn, mock_popen): # Feature is enabled cfg.CONF.set_override(name='stream_output', group='actionrunner', override=True) @@ -210,8 +210,8 @@ def test_action_stdout_and_stderr_is_stored_in_the_db(self, mock_spawn, mock_pop self.assertEqual(output_dbs[1].data, mock_stderr[1]) self.assertEqual(output_dbs[2].data, mock_stderr[2]) - @mock.patch('st2common.util.green.shell.subprocess.Popen') - @mock.patch('st2common.util.green.shell.eventlet.spawn') + @mock.patch('st2common.util.concurrency.subprocess_popen') + @mock.patch('st2common.util.concurrency.spawn') def test_action_stdout_and_stderr_is_stored_in_the_db_short_running_action(self, mock_spawn, mock_popen): # Verify that we correctly retrieve all the output and wait for stdout and stderr reading @@ -331,7 +331,7 @@ def test_shell_command_sudo_password_is_passed_to_sudo_binary(self): self.assertEquals(result['stdout'], sudo_password) # Verify new process which provides password via stdin to the command is created - with mock.patch('eventlet.green.subprocess.Popen') as mock_subproc_popen: + with mock.patch('st2common.util.concurrency.subprocess_popen') as mock_subproc_popen: index = 0 for sudo_password in sudo_passwords: runner = self._get_runner(action_db, cmd=cmd) @@ -510,8 +510,8 @@ def test_script_with_parameters_parameter_serialization(self): output_dbs = ActionExecutionOutput.query(output_type='stderr') self.assertEqual(len(output_dbs), 0) - @mock.patch('st2common.util.green.shell.subprocess.Popen') - @mock.patch('st2common.util.green.shell.eventlet.spawn') + @mock.patch('st2common.util.concurrency.subprocess_popen') + @mock.patch('st2common.util.concurrency.spawn') def test_action_stdout_and_stderr_is_stored_in_the_db(self, mock_spawn, mock_popen): # Feature is enabled cfg.CONF.set_override(name='stream_output', group='actionrunner', override=True) diff --git a/contrib/runners/python_runner/python_runner/python_runner.py b/contrib/runners/python_runner/python_runner/python_runner.py index 0931853072..a1f42101b0 100644 --- a/contrib/runners/python_runner/python_runner/python_runner.py +++ b/contrib/runners/python_runner/python_runner/python_runner.py @@ -318,15 +318,17 @@ def _get_output_values(self, exit_code, stdout, stderr, timed_out): action_result = split[1].strip() stdout = split[0] + split[2] else: + # Timeout or similar action_result = None - # Parse the serialized action result object - try: - action_result = json.loads(action_result) - except Exception as e: - # Failed to de-serialize the result, probably it contains non-simple type or similar - LOG.warning('Failed to de-serialize result "%s": %s' % (str(action_result), - six.text_type(e))) + # Parse the serialized action result object (if available) + if action_result: + try: + action_result = json.loads(action_result) + except Exception as e: + # Failed to de-serialize the result, probably it contains non-simple type or similar + LOG.warning('Failed to de-serialize result "%s": %s' % (str(action_result), + six.text_type(e))) if action_result: if isinstance(action_result, dict): diff --git a/contrib/runners/python_runner/tests/unit/test_pythonrunner.py b/contrib/runners/python_runner/tests/unit/test_pythonrunner.py index 4289f9f031..9c242f237f 100644 --- a/contrib/runners/python_runner/tests/unit/test_pythonrunner.py +++ b/contrib/runners/python_runner/tests/unit/test_pythonrunner.py @@ -251,7 +251,7 @@ def test_simple_action_no_entry_point(self): expected_msg = 'Action .*? is missing entry_point attribute' self.assertRaisesRegexp(Exception, expected_msg, runner.run, {}) - @mock.patch('st2common.util.green.shell.subprocess.Popen') + @mock.patch('st2common.util.concurrency.subprocess_popen') def test_action_with_user_supplied_env_vars(self, mock_popen): env_vars = {'key1': 'val1', 'key2': 'val2', 'PYTHONPATH': 'foobar'} @@ -275,8 +275,8 @@ def test_action_with_user_supplied_env_vars(self, mock_popen): else: self.assertEqual(actual_env[key], value) - @mock.patch('st2common.util.green.shell.subprocess.Popen') - @mock.patch('st2common.util.green.shell.eventlet.spawn') + @mock.patch('st2common.util.concurrency.subprocess_popen') + @mock.patch('st2common.util.concurrency.spawn') def test_action_stdout_and_stderr_is_not_stored_in_db_by_default(self, mock_spawn, mock_popen): # Feature should be disabled by default values = {'delimiter': ACTION_OUTPUT_RESULT_DELIMITER} @@ -344,8 +344,8 @@ def test_action_stdout_and_stderr_is_not_stored_in_db_by_default(self, mock_spaw output_dbs = ActionExecutionOutput.get_all() self.assertEqual(len(output_dbs), 0) - @mock.patch('st2common.util.green.shell.subprocess.Popen') - @mock.patch('st2common.util.green.shell.eventlet.spawn') + @mock.patch('st2common.util.concurrency.subprocess_popen') + @mock.patch('st2common.util.concurrency.spawn') def test_action_stdout_and_stderr_is_stored_in_the_db(self, mock_spawn, mock_popen): # Feature is enabled cfg.CONF.set_override(name='stream_output', group='actionrunner', override=True) @@ -406,7 +406,7 @@ def test_action_stdout_and_stderr_is_stored_in_the_db(self, mock_spawn, mock_pop self.assertEqual(output_dbs[1].data, mock_stderr[1]) self.assertEqual(output_dbs[2].data, mock_stderr[2]) - @mock.patch('st2common.util.green.shell.subprocess.Popen') + @mock.patch('st2common.util.concurrency.subprocess_popen') def test_stdout_interception_and_parsing(self, mock_popen): values = {'delimiter': ACTION_OUTPUT_RESULT_DELIMITER} @@ -478,7 +478,7 @@ def test_stdout_interception_and_parsing(self, mock_popen): self.assertEqual(output['exit_code'], 0) self.assertEqual(status, 'succeeded') - @mock.patch('st2common.util.green.shell.subprocess.Popen') + @mock.patch('st2common.util.concurrency.subprocess_popen') def test_common_st2_env_vars_are_available_to_the_action(self, mock_popen): mock_process = mock.Mock() mock_process.communicate.return_value = ('', '') @@ -495,7 +495,7 @@ def test_common_st2_env_vars_are_available_to_the_action(self, mock_popen): actual_env = call_kwargs['env'] self.assertCommonSt2EnvVarsAvailableInEnv(env=actual_env) - @mock.patch('st2common.util.green.shell.subprocess.Popen') + @mock.patch('st2common.util.concurrency.subprocess_popen') def test_pythonpath_env_var_contains_common_libs_config_enabled(self, mock_popen): mock_process = mock.Mock() mock_process.communicate.return_value = ('', '') @@ -515,7 +515,7 @@ def test_pythonpath_env_var_contains_common_libs_config_enabled(self, mock_popen self.assertTrue('PYTHONPATH' in actual_env) self.assertTrue(pack_common_lib_path in actual_env['PYTHONPATH']) - @mock.patch('st2common.util.green.shell.subprocess.Popen') + @mock.patch('st2common.util.concurrency.subprocess_popen') def test_pythonpath_env_var_not_contains_common_libs_config_disabled(self, mock_popen): mock_process = mock.Mock() mock_process.communicate.return_value = ('', '') @@ -806,7 +806,7 @@ def test_content_version_success(self, mock_get_sandbox_virtualenv_path): self.assertRaisesRegexp(ValueError, expected_msg, runner.pre_run) @mock.patch('python_runner.python_runner.get_sandbox_virtualenv_path') - @mock.patch('st2common.util.green.shell.subprocess.Popen') + @mock.patch('st2common.util.concurrency.subprocess_popen') def test_content_version_contains_common_libs_config_enabled(self, mock_popen, mock_get_sandbox_virtualenv_path): # Verify that the common libs path correctly reflects directory in git worktree diff --git a/fixed-requirements.txt b/fixed-requirements.txt index 6d98f80b41..0695cb8e3e 100644 --- a/fixed-requirements.txt +++ b/fixed-requirements.txt @@ -39,7 +39,7 @@ virtualenv==16.6.0 sseclient-py==1.7 python-editor==1.0.4 prompt-toolkit==1.0.15 -tooz==1.64.2 +tooz==1.65.0 zake==0.2.2 routes==2.4.1 webob==1.8.4 diff --git a/requirements.txt b/requirements.txt index d893792cc2..ce036d83b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -52,7 +52,7 @@ semver==2.8.1 six==1.12.0 sseclient-py==1.7 stevedore==1.30.1 -tooz==1.64.2 +tooz==1.65.0 ujson==1.35 unittest2 webob==1.8.4 diff --git a/st2common/in-requirements.txt b/st2common/in-requirements.txt index 9db51c48ad..2796e02a67 100644 --- a/st2common/in-requirements.txt +++ b/st2common/in-requirements.txt @@ -33,3 +33,6 @@ ujson # Note: amqp is used by kombu, this needs to be added here to be picked up by # requirements fixate script. amqp +# Used by st2-pack-* commands +gitpython +lockfile diff --git a/st2common/requirements.txt b/st2common/requirements.txt index 70cfd57de7..f8f8d0711b 100644 --- a/st2common/requirements.txt +++ b/st2common/requirements.txt @@ -5,12 +5,14 @@ cryptography==2.6.1 eventlet==0.24.1 flex==6.14.0 git+/~https://github.com/StackStorm/orquesta.git@v1.0.0#egg=orquesta +gitpython==2.1.11 greenlet==0.4.15 ipaddr jinja2==2.10.1 jsonpath-rw==1.4.0 jsonschema==2.6.0 kombu==4.5.0 +lockfile==0.12.2 mongoengine==0.17.0 networkx==1.11 oslo.config<1.13,>=1.12.1 @@ -24,7 +26,7 @@ retrying==1.3.3 routes==2.4.1 semver==2.8.1 six==1.12.0 -tooz==1.64.2 +tooz==1.65.0 ujson==1.35 webob==1.8.4 zake==0.2.2 diff --git a/st2common/setup.py b/st2common/setup.py index b0245eca3f..3e7bd4fb5c 100644 --- a/st2common/setup.py +++ b/st2common/setup.py @@ -53,14 +53,13 @@ 'bin/st2-run-pack-tests', 'bin/st2ctl', 'bin/st2-generate-symmetric-crypto-key', - 'bin/migrations/v1.5/st2-migrate-datastore-to-include-scope-secret.py', - 'bin/migrations/v2.1/st2-migrate-datastore-scopes.py', - 'bin/migrations/v2.1/st2-migrate-runners.sh', 'bin/st2-self-check', 'bin/st2-track-result', 'bin/st2-validate-pack-config', 'bin/st2-check-license', - 'bin/st2-pack-install' + 'bin/st2-pack-install', + 'bin/st2-pack-download', + 'bin/st2-pack-setup-virtualenv' ], entry_points={ 'st2common.metrics.driver': [ diff --git a/st2common/st2common/runners/base.py b/st2common/st2common/runners/base.py index b9caa6b625..c4214fe5ca 100644 --- a/st2common/st2common/runners/base.py +++ b/st2common/st2common/runners/base.py @@ -48,6 +48,8 @@ 'PollingAsyncActionRunner', 'ShellRunnerMixin', + 'get_runner_module', + 'get_runner', 'get_metadata', @@ -70,6 +72,25 @@ def get_runner(name, config=None): """ LOG.debug('Runner loading Python module: %s', name) + module = get_runner_module(name=name) + + LOG.debug('Instance of runner module: %s', module) + + if config: + runner_kwargs = {'config': config} + else: + runner_kwargs = {} + + runner = module.get_runner(**runner_kwargs) + LOG.debug('Instance of runner: %s', runner) + return runner + + +def get_runner_module(name): + """ + Load runner driver and return reference to the runner driver module. + """ + # NOTE: For backward compatibility we also support "_" in place of "-" from stevedore.exception import NoMatches @@ -90,16 +111,7 @@ def get_runner(name, config=None): raise exc.ActionRunnerCreateError('%s\n\n%s' % (msg, six.text_type(e))) - LOG.debug('Instance of runner module: %s', module) - - if config: - runner_kwargs = {'config': config} - else: - runner_kwargs = {} - - runner = module.get_runner(**runner_kwargs) - LOG.debug('Instance of runner: %s', runner) - return runner + return module def get_query_module(name): diff --git a/st2common/st2common/util/concurrency.py b/st2common/st2common/util/concurrency.py new file mode 100644 index 0000000000..62544eec3a --- /dev/null +++ b/st2common/st2common/util/concurrency.py @@ -0,0 +1,113 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module which acts as a compatibility later between eventlet and gevent. + +It dispatches function call to the concurrency library which is configured using +"set_concurrency_library" functoion. +""" + +try: + import eventlet # pylint: disable=import-error +except ImportError: + eventlet = None + +try: + import gevent # pylint: disable=import-error +except ImportError: + gevent = None + +CONCURRENCY_LIBRARY = 'eventlet' + +__all__ = [ + 'set_concurrency_library', + 'get_concurrency_library', + + 'get_subprocess_module', + 'subprocess_popen', + + 'spawn', + 'wait', + 'cancel', + 'kill' +] + + +def set_concurrency_library(library): + global CONCURRENCY_LIBRARY + + if library not in ['eventlet', 'gevent']: + raise ValueError('Unsupported concurrency library: %s' % (library)) + + CONCURRENCY_LIBRARY = library + + +def get_concurrency_library(): + global CONCURRENCY_LIBRARY + return CONCURRENCY_LIBRARY + + +def get_subprocess_module(): + if CONCURRENCY_LIBRARY == 'eventlet': + from eventlet.green import subprocess # pylint: disable=import-error + return subprocess + elif CONCURRENCY_LIBRARY == 'gevent': + from gevent import subprocess # pylint: disable=import-error + return subprocess + + +def subprocess_popen(*args, **kwargs): + if CONCURRENCY_LIBRARY == 'eventlet': + from eventlet.green import subprocess # pylint: disable=import-error + return subprocess.Popen(*args, **kwargs) + elif CONCURRENCY_LIBRARY == 'gevent': + from gevent import subprocess # pylint: disable=import-error + return subprocess.Popen(*args, **kwargs) + + +def spawn(func, *args, **kwargs): + if CONCURRENCY_LIBRARY == 'eventlet': + return eventlet.spawn(func, *args, **kwargs) + elif CONCURRENCY_LIBRARY == 'gevent': + return gevent.spawn(func, *args, **kwargs) + else: + raise ValueError('Unsupported concurrency library') + + +def wait(green_thread, *args, **kwargs): + if CONCURRENCY_LIBRARY == 'eventlet': + return green_thread.wait(*args, **kwargs) + elif CONCURRENCY_LIBRARY == 'gevent': + return green_thread.join(*args, **kwargs) + else: + raise ValueError('Unsupported concurrency library') + + +def cancel(green_thread, *args, **kwargs): + if CONCURRENCY_LIBRARY == 'eventlet': + return green_thread.cancel(*args, **kwargs) + elif CONCURRENCY_LIBRARY == 'gevent': + return green_thread.kill(*args, **kwargs) + else: + raise ValueError('Unsupported concurrency library') + + +def kill(green_thread, *args, **kwargs): + if CONCURRENCY_LIBRARY == 'eventlet': + return green_thread.kill(*args, **kwargs) + elif CONCURRENCY_LIBRARY == 'gevent': + return green_thread.kill(*args, **kwargs) + else: + raise ValueError('Unsupported concurrency library') diff --git a/st2common/st2common/util/green/shell.py b/st2common/st2common/util/green/shell.py index 3b629e41e6..dee5be3e29 100644 --- a/st2common/st2common/util/green/shell.py +++ b/st2common/st2common/util/green/shell.py @@ -13,18 +13,18 @@ # limitations under the License. """ -Shell utility functions which use non-blocking and eventlet friendly code. +Shell utility functions which use non-blocking and eventlet / gevent friendly code. """ from __future__ import absolute_import import os +import subprocess import six -import eventlet -from eventlet.green import subprocess from st2common import log as logging +from st2common.util import concurrency __all__ = [ 'run_command' @@ -101,19 +101,21 @@ def run_command(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, LOG.debug('env argument not provided. using process env (os.environ).') env = os.environ.copy() - # Note: We are using eventlet friendly implementation of subprocess - # which uses GreenPipe so it doesn't block + subprocess = concurrency.get_subprocess_module() + + # Note: We are using eventlet / gevent friendly implementation of subprocess which uses + # GreenPipe so it doesn't block LOG.debug('Creating subprocess.') - process = subprocess.Popen(args=cmd, stdin=stdin, stdout=stdout, stderr=stderr, - env=env, cwd=cwd, shell=shell, preexec_fn=preexec_func) + process = concurrency.subprocess_popen(args=cmd, stdin=stdin, stdout=stdout, stderr=stderr, + env=env, cwd=cwd, shell=shell, preexec_fn=preexec_func) if read_stdout_func: LOG.debug('Spawning read_stdout_func function') - read_stdout_thread = eventlet.spawn(read_stdout_func, process.stdout, read_stdout_buffer) + read_stdout_thread = concurrency.spawn(read_stdout_func, process.stdout, read_stdout_buffer) if read_stderr_func: LOG.debug('Spawning read_stderr_func function') - read_stderr_thread = eventlet.spawn(read_stderr_func, process.stderr, read_stderr_buffer) + read_stderr_thread = concurrency.spawn(read_stderr_func, process.stderr, read_stderr_buffer) def on_timeout_expired(timeout): global timed_out @@ -125,8 +127,6 @@ def on_timeout_expired(timeout): # Command has timed out, kill the process and propagate the error. # Note: We explicitly set the returncode to indicate the timeout. LOG.debug('Command execution timeout reached.') - process.returncode = TIMEOUT_EXIT_CODE - if kill_func: LOG.debug('Calling kill_func.') kill_func(process=process) @@ -134,13 +134,17 @@ def on_timeout_expired(timeout): LOG.debug('Killing process.') process.kill() + # NOTE: It's imporant to set returncode here, since call to kill() + # sets it and overwrites it if we set it earlier + process.returncode = TIMEOUT_EXIT_CODE + if read_stdout_func and read_stderr_func: LOG.debug('Killing read_stdout_thread and read_stderr_thread') - read_stdout_thread.kill() - read_stderr_thread.kill() + concurrency.kill(read_stdout_thread) + concurrency.kill(read_stderr_thread) LOG.debug('Spawning timeout handler thread.') - timeout_thread = eventlet.spawn(on_timeout_expired, timeout) + timeout_thread = concurrency.spawn(on_timeout_expired, timeout) LOG.debug('Attaching to process.') if stdin_value: @@ -156,13 +160,13 @@ def on_timeout_expired(timeout): LOG.debug('Using delayed stdout and stderr read mode, calling process.communicate()') stdout, stderr = process.communicate() - timeout_thread.cancel() + concurrency.cancel(timeout_thread) exit_code = process.returncode if read_stdout_func and read_stderr_func: # Wait on those green threads to finish reading from stdout and stderr before continuing - read_stdout_thread.wait() - read_stderr_thread.wait() + concurrency.wait(read_stdout_thread) + concurrency.wait(read_stderr_thread) stdout = read_stdout_buffer.getvalue() stderr = read_stderr_buffer.getvalue() diff --git a/st2common/st2common/util/pack_management.py b/st2common/st2common/util/pack_management.py index f0ecf6a484..a84b41553d 100644 --- a/st2common/st2common/util/pack_management.py +++ b/st2common/st2common/util/pack_management.py @@ -30,6 +30,7 @@ from git.repo import Repo from gitdb.exc import BadName, BadObject from lockfile import LockFile +from distutils.spawn import find_executable from st2common import log as logging from st2common.content import utils @@ -63,6 +64,8 @@ CURRENT_STACKSTORM_VERSION = get_stackstorm_version() CURRENT_PYTHON_VERSION = get_python_version() +SUDO_BINARY = find_executable('sudo') + def download_pack(pack, abs_repo_base='/opt/stackstorm/packs', verify_ssl=True, force=False, proxy_config=None, force_owner_group=True, force_permissions=True, @@ -128,9 +131,28 @@ def download_pack(pack, abs_repo_base='/opt/stackstorm/packs', verify_ssl=True, user_home = os.path.expanduser('~') abs_local_path = os.path.join(user_home, temp_dir_name) - # 1. Clone / download the repo - clone_repo(temp_dir=abs_local_path, repo_url=pack_url, verify_ssl=verify_ssl, - ref=pack_version) + if pack_url.startswith('file://'): + # Local pack + local_pack_directory = os.path.abspath(os.path.join(pack_url.split('file://')[1])) + else: + local_pack_directory = None + + # If it's a local pack which is not a git repository, just copy the directory content + # over + if local_pack_directory and not os.path.isdir( + os.path.join(local_pack_directory, '.git')): + if not os.path.isdir(local_pack_directory): + raise ValueError('Local pack directory "%s" doesn\'t exist' % + (local_pack_directory)) + + logger.debug('Detected local pack directory which is not a git repository, just ' + 'copying files over...') + + shutil.copytree(local_pack_directory, abs_local_path) + else: + # 1. Clone / download the repo + clone_repo(temp_dir=abs_local_path, repo_url=pack_url, verify_ssl=verify_ssl, + ref=pack_version) pack_ref = get_pack_ref(pack_dir=abs_local_path) result[1] = pack_ref @@ -237,7 +259,7 @@ def clone_repo(temp_dir, repo_url, verify_ssl=True, ref='master'): def move_pack(abs_repo_base, pack_name, abs_local_path, force_owner_group=True, - force_permissions=True, logger=LOG): + force_permissions=True, logger=LOG): """ Move pack directory into the final location. """ @@ -289,7 +311,14 @@ def apply_pack_owner_group(pack_path): if pack_group: LOG.debug('Changing owner group of "%s" directory to %s' % (pack_path, pack_group)) - exit_code, _, stderr, _ = shell.run_command(['sudo', 'chgrp', '-R', pack_group, pack_path]) + + if SUDO_BINARY: + args = ['sudo', 'chgrp', '-R', pack_group, pack_path] + else: + # Environments where sudo is not available (e.g. docker) + args = ['chgrp', '-R', pack_group, pack_path] + + exit_code, _, stderr, _ = shell.run_command(args) if exit_code != 0: # Non fatal, but we still log it