Skip to content

Commit

Permalink
Merge branch 'master' into issue_4260/json_output_pack_install
Browse files Browse the repository at this point in the history
* master:
  Allow user to specify PYTHON_VERSION env variable.
  Add a changelog entry.
  Add a NOTE on old and deprecated timer options.
  Fix an issue with timers engine config options not working as advertised.
  Timers engine options are also used by st2notifier so they need to be registered as common options inside st2common.
  Make sure we correctly pass local timezone specified in the st2.conf to each BlockingScheduler class constructor.
  Remove debug code.
  Make sure we run pack tests under Python 3 on Travis.
  Fix typo.
  Fix python 3 related issues.
  Add tox target for running pack tests under Python 3.
  Add missing st2-garbagecollector service to launchdev script.
  Fix launchdev not correclty killing gunicorn processes.
  Fix launchdev so it correctly uses VIRTUALENV variable everywhere.
  Fix a bug with garbage collection not working correctly if config options are not set.
  Allow user to set VIRTUALENV environment variable for launchdev.sh script.
  Invoke runner post run on completion of orquesta workflows
  Fix cascade of workflow user context to orquesta tasks
  Minor edit to request_resume to help troubleshoot CI failure
  Use st2common methods to iterate drivers.
  • Loading branch information
Lakshmi Kannan committed Nov 6, 2018
2 parents 8f0c754 + 3ac37d0 commit 8847a03
Show file tree
Hide file tree
Showing 18 changed files with 174 additions and 96 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ Fixed
CLI incorrectly tried to parse that string as unicode escape sequence.

Reported by James E. King III @jeking3 (bug fix) #4407
* Fix a bug so ``timersengine`` config section in ``st2.conf`` has precedence over ``timer``
section if explicitly specified in the config file.

Also fix a bug with default config values for ``timer`` section being used if user only
specified ``timersengine`` section in the config. Previously user options were incorrectly
ignored in favor of the default values. (bug fix) #4424
* ``st2 pack install -j`` now only spits JSON output. Similarly, ``st2 pack install -y`` only spits
YAML output. This change would enable the output to be parsed by tools.
The behavior of ``st2 pack install`` hasn't changed and is human friendly. If you want to get meta
Expand Down
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ else
VIRTUALENV_ST2CLIENT_DIR ?= virtualenv-st2client
endif

PYTHON_VERSION = python2.7
PYTHON_VERSION ?= python2.7

BINARIES := bin

Expand Down Expand Up @@ -46,8 +46,6 @@ COMPONENTS_TEST_MODULES_COMMA := $(subst $(space_char),$(comma),$(COMPONENTS_TES
COVERAGE_GLOBS := .coverage.unit.* .coverage.integration.* .coverage.mistral.*
COVERAGE_GLOBS_QUOTED := $(foreach glob,$(COVERAGE_GLOBS),'$(glob)')

PYTHON_TARGET := 2.7

REQUIREMENTS := test-requirements.txt requirements.txt
PIP_OPTIONS := $(ST2_PIP_OPTIONS)

Expand Down Expand Up @@ -829,6 +827,7 @@ ci-py3-unit:
@echo "==================== ci-py3-unit ===================="
@echo
NOSE_WITH_TIMER=$(NOSE_WITH_TIMER) tox -e py36-unit -vv
NOSE_WITH_TIMER=$(NOSE_WITH_TIMER) tox -e py36-packs -vv

.PHONY: ci-py3-integration
ci-py3-integration: requirements .ci-prepare-integration .ci-py3-integration
Expand Down
12 changes: 6 additions & 6 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,12 @@ ssh_key_file = /home/stanley/.ssh/stanley_rsa
user = stanley

[timer]
# Specify to enable timer service.
enable = True
# Timezone pertaining to the location where st2 is run.
local_timezone = America/Los_Angeles
# Location of the logging configuration file.
logging = /etc/st2/logging.timersengine.conf
# Specify to enable timer service. NOTE: Deprecated in favor of timersengine.enable
enable = None
# Timezone pertaining to the location where st2 is run. NOTE: Deprecated in favor of timersengine.local_timezone
local_timezone = None
# Location of the logging configuration file. NOTE: Deprecated in favor of timersengine.logging
logging = None

[timersengine]
# Specify to enable timer service.
Expand Down
10 changes: 5 additions & 5 deletions contrib/packs/tests/test_action_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def tearDown(self):
def test_run_pack_download(self):
action = self.get_action_instance()
result = action.run(packs=['test'], abs_repo_base=self.repo_base)
temp_dir = hashlib.md5(PACK_INDEX['test']['repo_url']).hexdigest()
temp_dir = hashlib.md5(PACK_INDEX['test']['repo_url'].encode()).hexdigest()

self.assertEqual(result, {'test': 'Success.'})
self.clone_from.assert_called_once_with(PACK_INDEX['test']['repo_url'],
Expand All @@ -131,8 +131,8 @@ def test_run_pack_download_multiple_packs(self):
action = self.get_action_instance()
result = action.run(packs=['test', 'test2'], abs_repo_base=self.repo_base)
temp_dirs = [
hashlib.md5(PACK_INDEX['test']['repo_url']).hexdigest(),
hashlib.md5(PACK_INDEX['test2']['repo_url']).hexdigest()
hashlib.md5(PACK_INDEX['test']['repo_url'].encode()).hexdigest(),
hashlib.md5(PACK_INDEX['test2']['repo_url'].encode()).hexdigest()
]

self.assertEqual(result, {'test': 'Success.', 'test2': 'Success.'})
Expand Down Expand Up @@ -160,7 +160,7 @@ def test_run_pack_download_no_tag(self):

def test_run_pack_lock_is_already_acquired(self):
action = self.get_action_instance()
temp_dir = hashlib.md5(PACK_INDEX['test']['repo_url']).hexdigest()
temp_dir = hashlib.md5(PACK_INDEX['test']['repo_url'].encode()).hexdigest()

original_acquire = LockFile.acquire

Expand All @@ -186,7 +186,7 @@ def mock_acquire(self, timeout=None):
def test_run_pack_lock_is_already_acquired_force_flag(self):
# Lock is already acquired but force is true so it should be deleted and released
action = self.get_action_instance()
temp_dir = hashlib.md5(PACK_INDEX['test']['repo_url']).hexdigest()
temp_dir = hashlib.md5(PACK_INDEX['test']['repo_url'].encode()).hexdigest()

original_acquire = LockFile.acquire

Expand Down
15 changes: 14 additions & 1 deletion contrib/runners/orquesta_runner/tests/unit/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from st2common.persistence import liveaction as lv_db_access
from st2common.persistence import workflow as wf_db_access
from st2common.runners import base as runners
from st2common.runners import utils as runners_utils
from st2common.services import action as ac_svc
from st2common.services import policies as pc_svc
from st2common.services import workflows as wf_svc
Expand Down Expand Up @@ -98,7 +99,12 @@ def setUpClass(cls):
def get_runner_class(cls, runner_name):
return runners.get_runner(runner_name, runner_name).__class__

@mock.patch.object(
runners_utils,
'invoke_post_run',
mock.MagicMock(return_value=None))
def test_run_workflow(self):
username = 'stanley'
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
wf_input = {'who': 'Thanos'}
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'], parameters=wf_input)
Expand Down Expand Up @@ -129,7 +135,7 @@ def test_run_workflow(self):
'workflow_execution_id': str(wf_ex_db.id),
'action_execution_id': str(ac_ex_db.id),
'api_url': 'http://127.0.0.1/v1',
'user': 'stanley'
'user': username
}
}

Expand Down Expand Up @@ -163,6 +169,7 @@ def test_run_workflow(self):
tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
self.assertEqual(tk1_lv_ac_db.context.get('user'), username)
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
self.assertTrue(wf_svc.is_action_execution_under_workflow_context(tk1_ac_ex_db))

Expand All @@ -180,6 +187,7 @@ def test_run_workflow(self):
tk2_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk2_ex_db.id))[0]
tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_db.liveaction['id'])
self.assertEqual(tk2_lv_ac_db.context.get('user'), username)
self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
self.assertTrue(wf_svc.is_action_execution_under_workflow_context(tk2_ac_ex_db))

Expand All @@ -197,6 +205,7 @@ def test_run_workflow(self):
tk3_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
tk3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk3_ex_db.id))[0]
tk3_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk3_ac_ex_db.liveaction['id'])
self.assertEqual(tk3_lv_ac_db.context.get('user'), username)
self.assertEqual(tk3_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
self.assertTrue(wf_svc.is_action_execution_under_workflow_context(tk3_ac_ex_db))

Expand All @@ -211,6 +220,10 @@ def test_run_workflow(self):
ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)

# Check post run is invoked for the liveaction.
self.assertTrue(runners_utils.invoke_post_run.called)
self.assertEqual(runners_utils.invoke_post_run.call_count, 1)

# Check workflow output.
expected_output = {'msg': '%s, All your base are belong to us!' % wf_input['who']}

Expand Down
3 changes: 2 additions & 1 deletion st2actions/st2actions/notifier/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@


def get_rescheduler():
timer = BlockingScheduler()
local_tz = cfg.CONF.timer.local_timezone or cfg.CONF.timersengine.local_timezone
timer = BlockingScheduler(timezone=local_tz)

time_spec = {
'seconds': cfg.CONF.scheduler.rescheduling_interval,
Expand Down
3 changes: 2 additions & 1 deletion st2api/tests/unit/controllers/v1/test_executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,8 @@ def test_put_resume_not_paused(self):
updates = {'status': 'resuming'}
put_resp = self._do_put(execution_id, updates, expect_errors=True)
self.assertEqual(put_resp.status_int, 400)
self.assertIn('is not in a paused state', put_resp.json['faultstring'])
expected_error_message = 'it is in "pausing" state and not in "paused" state'
self.assertIn(expected_error_message, put_resp.json['faultstring'])

get_resp = self._do_get_one(execution_id)
self.assertEqual(get_resp.status_int, 200)
Expand Down
42 changes: 42 additions & 0 deletions st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,48 @@ def register_opts(ignore_errors=False):

do_register_opts(metrics_opts, group='metrics', ignore_errors=ignore_errors)

# Common timers engine options
timer_logging_opts = [
cfg.StrOpt(
'logging', default=None,
help='Location of the logging configuration file. '
'NOTE: Deprecated in favor of timersengine.logging'),
]

timers_engine_logging_opts = [
cfg.StrOpt(
'logging', default='/etc/st2/logging.timersengine.conf',
help='Location of the logging configuration file.')
]

do_register_opts(timer_logging_opts, group='timer', ignore_errors=ignore_errors)
do_register_opts(timers_engine_logging_opts, group='timersengine', ignore_errors=ignore_errors)

# NOTE: We default old style deprecated "timer" options to None so our code
# works correclty and "timersengine" has precedence over "timers"
# NOTE: "timer" section will be removed in v3.1
timer_opts = [
cfg.StrOpt(
'local_timezone', default=None,
help='Timezone pertaining to the location where st2 is run. '
'NOTE: Deprecated in favor of timersengine.local_timezone'),
cfg.BoolOpt(
'enable', default=None,
help='Specify to enable timer service. '
'NOTE: Deprecated in favor of timersengine.enable'),
]

timers_engine_opts = [
cfg.StrOpt(
'local_timezone', default='America/Los_Angeles',
help='Timezone pertaining to the location where st2 is run.'),
cfg.BoolOpt(
'enable', default=True,
help='Specify to enable timer service.')
]
do_register_opts(timer_opts, group='timer', ignore_errors=ignore_errors)
do_register_opts(timers_engine_opts, group='timersengine', ignore_errors=ignore_errors)


def parse_args(args=None):
register_opts()
Expand Down
16 changes: 8 additions & 8 deletions st2common/st2common/models/db/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ def get_uid(self):
parts = []
parts.append(self.RESOURCE_TYPE)

componenets_hash = hashlib.md5()
componenets_hash.update(str(self.trace_tag).encode())
componenets_hash.update(str(self.trigger_instances).encode())
componenets_hash.update(str(self.rules).encode())
componenets_hash.update(str(self.action_executions).encode())
componenets_hash.update(str(self.start_timestamp).encode())

parts.append(componenets_hash.hexdigest())
components_hash = hashlib.md5()
components_hash.update(str(self.trace_tag).encode())
components_hash.update(str(self.trigger_instances).encode())
components_hash.update(str(self.rules).encode())
components_hash.update(str(self.action_executions).encode())
components_hash.update(str(self.start_timestamp).encode())

parts.append(components_hash.hexdigest())

uid = self.UID_SEPARATOR.join(parts)
return uid
Expand Down
1 change: 1 addition & 0 deletions st2common/st2common/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# Note: Imports are in-line to avoid large import time overhead

from __future__ import absolute_import

__all__ = [
'BACKENDS_NAMESPACE',

Expand Down
11 changes: 8 additions & 3 deletions st2common/st2common/services/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,19 @@ def request_resume(liveaction, requester):
'"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
)

if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING:
running_states = [
action_constants.LIVEACTION_STATUS_RUNNING,
action_constants.LIVEACTION_STATUS_RESUMING
]

if liveaction.status in running_states:
execution = ActionExecution.get(liveaction__id=str(liveaction.id))
return (liveaction, execution)

if liveaction.status != action_constants.LIVEACTION_STATUS_PAUSED:
raise runner_exc.UnexpectedActionExecutionStatusError(
'Unable to resume liveaction "%s" because it is not in a paused state.'
% liveaction.id
'Unable to resume liveaction "%s" because it is in "%s" state and '
'not in "paused" state.' % (liveaction.id, liveaction.status)
)

liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_RESUMING)
Expand Down
16 changes: 12 additions & 4 deletions st2common/st2common/services/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from st2common.persistence import liveaction as lv_db_access
from st2common.persistence import execution as ex_db_access
from st2common.persistence import workflow as wf_db_access
from st2common.runners import utils as runners_utils
from st2common.services import action as ac_svc
from st2common.services import executions as ex_svc
from st2common.util import action_db as action_utils
Expand Down Expand Up @@ -813,7 +814,8 @@ def request_next_tasks(wf_ex_db, task_ex_id=None):
for task in next_tasks:
try:
LOG.info('[%s] Requesting execution for task "%s".', wf_ac_ex_id, task['id'])
st2_ctx = {'execution_id': wf_ex_db.action_execution}
root_st2_ctx = wf_ex_db.context.get('st2', {})
st2_ctx = {'execution_id': wf_ac_ex_id, 'user': root_st2_ctx.get('user')}
request_task_execution(wf_ex_db, st2_ctx, task)
except Exception as e:
LOG.exception('[%s] Failed task execution for "%s".', wf_ac_ex_id, task['id'])
Expand Down Expand Up @@ -941,12 +943,13 @@ def update_execution_records(wf_ex_db, conductor, update_lv_ac_on_states=None,
pub_wf_ex=False, pub_lv_ac=True, pub_ac_ex=True):

wf_ac_ex_id = wf_ex_db.action_execution
wf_old_status = wf_ex_db.status

# Update workflow status.
# Determine if workflow status has changed.
wf_old_status = wf_ex_db.status
wf_ex_db.status = conductor.get_workflow_state()
status_changed = (wf_old_status != wf_ex_db.status)

if wf_old_status != wf_ex_db.status:
if status_changed:
msg = '[%s] Updating workflow execution from state "%s" to "%s".'
LOG.info(msg, wf_ac_ex_id, wf_old_status, wf_ex_db.status)

Expand Down Expand Up @@ -1001,3 +1004,8 @@ def update_execution_records(wf_ex_db, conductor, update_lv_ac_on_states=None,
publish=pub_lv_ac)

ex_svc.update_execution(wf_lv_ac_db, publish=pub_ac_ex)

# Invoke post run on the liveaction for the workflow execution.
if status_changed and wf_lv_ac_db.status in ac_const.LIVEACTION_COMPLETED_STATES:
LOG.info('[%s] Workflow action execution is completed and invoking post run.', wf_ac_ex_id)
runners_utils.invoke_post_run(wf_lv_ac_db)
2 changes: 1 addition & 1 deletion st2common/st2common/util/pack_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def download_pack(pack, abs_repo_base='/opt/stackstorm/packs', verify_ssl=True,

result = [pack_url, None, None]

temp_dir_name = hashlib.md5(pack_url).hexdigest()
temp_dir_name = hashlib.md5(pack_url.encode()).hexdigest()
lock_file = LockFile('/tmp/%s' % (temp_dir_name))
lock_file_path = lock_file.lock_file

Expand Down
7 changes: 4 additions & 3 deletions st2reactor/st2reactor/garbage_collector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,22 @@ def _perform_garbage_collection(self):

# Note: We sleep for a bit between garbage collection of each object type to prevent busy
# waiting
if self._action_executions_ttl >= MINIMUM_TTL_DAYS:
if self._action_executions_ttl and self._action_executions_ttl >= MINIMUM_TTL_DAYS:
self._purge_action_executions()
eventlet.sleep(self._sleep_delay)
else:
LOG.debug('Skipping garbage collection for action executions since it\'s not '
'configured')

if self._action_executions_output_ttl >= MINIMUM_TTL_DAYS_EXECUTION_OUTPUT:
if self._action_executions_output_ttl and \
self._action_executions_output_ttl >= MINIMUM_TTL_DAYS_EXECUTION_OUTPUT:
self._purge_action_executions_output()
eventlet.sleep(self._sleep_delay)
else:
LOG.debug('Skipping garbage collection for action executions output since it\'s not '
'configured')

if self._trigger_instances_ttl >= MINIMUM_TTL_DAYS:
if self._trigger_instances_ttl and self._trigger_instances_ttl >= MINIMUM_TTL_DAYS:
self._purge_trigger_instances()
eventlet.sleep(self._sleep_delay)
else:
Expand Down
Loading

0 comments on commit 8847a03

Please sign in to comment.