Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement with items in task of native spec #94

Merged
merged 24 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cd7a5da
Add zip function to expression catalog
m4dcoder Sep 18, 2018
26e8dc5
Add util function to unescape chars in string
m4dcoder Sep 19, 2018
49dace9
Add with items support to native task spec
m4dcoder Sep 20, 2018
2542797
Support both inline and fully modeled with items in task spec
m4dcoder Sep 22, 2018
2f705df
Add unit tests to check variables in with items
m4dcoder Sep 26, 2018
3e57699
Add item function to retrieve current item from the context
m4dcoder Sep 28, 2018
565bccd
Support with items in task rendering
m4dcoder Sep 28, 2018
f67b55d
Add more test for repeated calls to get_next_tasks
m4dcoder Oct 1, 2018
33dd0bb
Add context to the execution event
m4dcoder Oct 3, 2018
4eec26d
Remove conductor get_start_tasks in favor of get_next_tasks
m4dcoder Oct 3, 2018
37b37c4
Add support for concurrency in task with items
m4dcoder Oct 8, 2018
00cb063
Add examples for task with items
m4dcoder Oct 8, 2018
ccb0435
Move the execution of the engine commands until the current task is p…
m4dcoder Oct 9, 2018
2db4002
Minor code clean up in update_task_flow
m4dcoder Oct 9, 2018
45595b1
Refactor rendering of task spec in conductor
m4dcoder Oct 9, 2018
a70b0b7
Add coverage for pause and pending of action executions
m4dcoder Oct 10, 2018
2c5c876
Add coverage for canceling of action executions
m4dcoder Oct 10, 2018
d029fad
Allow a cycle for a task with items to be handled properly
m4dcoder Oct 11, 2018
f4c8eee
Fix state change for task with items on workflow pause, resume, and c…
m4dcoder Oct 12, 2018
b3b2c03
Add basic performance test on the size of an items list
m4dcoder Oct 15, 2018
7e28f62
Fix expression evaluation at with items concurrency
m4dcoder Oct 17, 2018
400682b
Handle workflow paused and canceled state for with items
m4dcoder Oct 17, 2018
479227a
Minor refactor to use enumerate for iterating a list
m4dcoder Oct 18, 2018
85e86c4
Add docs section for task with items
m4dcoder Oct 19, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 101 additions & 41 deletions orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import logging
import six

from six.moves import queue

from orquesta import events
from orquesta import exceptions as exc
from orquesta.expressions import base as expr
Expand Down Expand Up @@ -56,6 +58,9 @@ def deserialize(cls, data):

return instance

def get_task(self, task_id):
return self.sequence[self.tasks[task_id]]

def get_tasks_by_state(self, states):
return [t for t in self.sequence if t['state'] in states]

Expand Down Expand Up @@ -86,6 +91,16 @@ def get_staged_tasks(self):
def has_staged_tasks(self):
return len(self.get_staged_tasks()) > 0

def remove_staged_task(self, task_id):
if task_id in self.staged:
any_items_running = [
item for item in self.staged[task_id].get('items', [])
if item['state'] in states.ACTIVE_STATES
]

if not any_items_running:
del self.staged[task_id]


class WorkflowConductor(object):

Expand Down Expand Up @@ -236,8 +251,14 @@ def request_workflow_state(self, state):
# Record current workflow state.
current_state = self.get_workflow_state()

# Process state change event.
# Create an event for the request.
wf_ex_event = events.WorkflowExecutionEvent(state)

# Push the event to all the active tasks. The event may trigger state changes to the task.
for task in self.flow.get_tasks_by_state(states.ACTIVE_STATES):
machines.TaskStateMachine.process_event(self, task, wf_ex_event)

# Process the workflow state change event.
machines.WorkflowStateMachine.process_event(self, wf_ex_event)

# Get workflow state after event is processed.
Expand Down Expand Up @@ -304,12 +325,6 @@ def _render_workflow_outputs(self):
def get_workflow_output(self):
return copy.deepcopy(self._outputs) if self._outputs else None

def _render_task_spec(self, task_name, ctx_value):
task_spec = self.spec.tasks.get_task(task_name).copy()
task_spec.action = expr.evaluate(task_spec.action, ctx_value)
task_spec.input = expr.evaluate(getattr(task_spec, 'input', {}), ctx_value)
return task_spec

def _inbound_criteria_satisfied(self, task_id):
inbounds = self.graph.get_prev_transitions(task_id)
inbounds_satisfied = []
Expand Down Expand Up @@ -341,34 +356,47 @@ def get_task(self, task_id):

current_task = {'id': task_id, 'name': task_name}
task_ctx = ctx.set_current_task(task_ctx, current_task)
task_spec = self._render_task_spec(task_name, task_ctx)
task_spec = self.spec.tasks.get_task(task_name).copy()
task_spec, action_specs = task_spec.render(task_ctx)

return {
task = {
'id': task_id,
'name': task_name,
'ctx': task_ctx,
'spec': task_spec
'spec': task_spec,
'actions': action_specs
}

def get_start_tasks(self):
if self.get_workflow_state() not in states.RUNNING_STATES:
return []
if task_spec.has_items():
items_spec = getattr(task_spec, 'with')
task['items_count'] = len(action_specs)
task['concurrency'] = getattr(items_spec, 'concurrency', None)

tasks = []
return task

for task_node in self.graph.roots:
try:
tasks.append(self.get_task(task_node['id']))
except Exception as e:
self.log_error(str(e), task_id=task_node['id'])
self.request_workflow_state(states.FAILED)
continue
def _evaluate_task_actions(self, task):
task_id = task['id']

# Return nothing if there is error(s) on determining start tasks.
if self.get_workflow_state() in states.COMPLETED_STATES:
return []
# Check if task is with items.
if task['spec'].has_items():
# Prepare the staging task to track items execution status.
if 'items' not in self.flow.staged[task_id] or not self.flow.staged[task_id]['items']:
self.flow.staged[task_id]['items'] = [{'state': states.UNSET}] * task['items_count']

return sorted(tasks, key=lambda x: x['name'])
# Trim the list of actions in the task per concurrency policy.
all_items = list(zip(task['actions'], self.flow.staged[task_id]['items']))
active_items = list(filter(lambda x: x[1]['state'] in states.ACTIVE_STATES, all_items))
notrun_items = list(filter(lambda x: x[1]['state'] == states.UNSET, all_items))

if task['concurrency'] is not None:
availability = task['concurrency'] - len(active_items)
candidates = list(zip(*notrun_items[:availability]))
task['actions'] = list(candidates[0]) if candidates and availability > 0 else []
else:
candidates = list(zip(*notrun_items))
task['actions'] = list(candidates[0]) if candidates else []

return task

def has_next_tasks(self, task_id=None):
next_tasks = []
Expand Down Expand Up @@ -408,7 +436,11 @@ def get_next_tasks(self, task_id=None):
if not task_id:
for staged_task_id in self.flow.get_staged_tasks():
try:
next_tasks.append(self.get_task(staged_task_id))
next_task = self.get_task(staged_task_id)
next_task = self._evaluate_task_actions(next_task)

if 'actions' in next_task and len(next_task['actions']) > 0:
next_tasks.append(next_task)
except Exception as e:
self.log_error(str(e), task_id=staged_task_id)
self.request_workflow_state(states.FAILED)
Expand Down Expand Up @@ -440,7 +472,11 @@ def get_next_tasks(self, task_id=None):
continue

try:
next_tasks.append(self.get_task(next_task_id))
next_task = self.get_task(next_task_id)
next_task = self._evaluate_task_actions(next_task)

if 'actions' in next_task and len(next_task['actions']) > 0:
next_tasks.append(next_task)
except Exception as e:
self.log_error(str(e), task_id=next_task_id)
self.request_workflow_state(states.FAILED)
Expand Down Expand Up @@ -472,6 +508,7 @@ def add_task_flow(self, task_id, in_ctx_idx=None):

def update_task_flow(self, task_id, event):
in_ctx_idx = 0
engine_event_queue = queue.Queue()

# Throw exception if not expected event type.
if not issubclass(type(event), events.ExecutionEvent):
Expand All @@ -488,8 +525,8 @@ def update_task_flow(self, task_id, event):
if task_id not in self.flow.staged and not task_flow_entry:
raise exc.InvalidTaskFlowEntry(task_id)

# Remove the task from the staged list if it is processed.
if event.state and task_id in self.flow.staged:
# Get the incoming context from the staged task.
if task_id in self.flow.staged:
in_ctx_idxs = self.flow.staged[task_id]['ctxs']

if len(in_ctx_idxs) <= 0 or all(x == in_ctx_idxs[0] for x in in_ctx_idxs):
Expand All @@ -499,8 +536,6 @@ def update_task_flow(self, task_id, event):
self.flow.contexts.append(new_ctx_entry)
in_ctx_idx = len(self.flow.contexts) - 1

del self.flow.staged[task_id]

# Create new task flow entry if it does not exist.
if not task_flow_entry:
task_flow_entry = self.add_task_flow(task_id, in_ctx_idx=in_ctx_idx)
Expand All @@ -509,32 +544,55 @@ def update_task_flow(self, task_id, event):
if self.graph.in_cycle(task_id) and task_flow_entry.get('state') in states.COMPLETED_STATES:
task_flow_entry = self.add_task_flow(task_id, in_ctx_idx=in_ctx_idx)

# Remove task from staging if task is not with items.
if event.state and task_id in self.flow.staged and 'items' not in self.flow.staged[task_id]:
del self.flow.staged[task_id]

# If action execution is for a task item, then store the execution state for the item.
if (event.state and event.context and
'item_id' in event.context and event.context['item_id'] is not None):
item_result = {'state': event.state, 'result': event.result}
self.flow.staged[task_id]['items'][event.context['item_id']] = item_result

# Log the error if it is a failed execution event.
if event.state == states.FAILED:
message = 'Execution failed. See result for details.'
self.log_error(message, task_id=task_id, result=event.result)

# Process the action execution event using the task state machine and update the task state.
machines.TaskStateMachine.process_event(task_flow_entry, event)
old_task_state = task_flow_entry.get('state', states.UNSET)
machines.TaskStateMachine.process_event(self, task_flow_entry, event)
new_task_state = task_flow_entry.get('state', states.UNSET)

# Evaluate task transitions if task is in completed state.
if task_flow_entry['state'] in states.COMPLETED_STATES:
# Get task result and set current context if task is completed.
if new_task_state in states.COMPLETED_STATES:
# Get task details required for updating outgoing context.
task_node = self.graph.get_task(task_id)
task_name = task_node['name']
task_spec = self.spec.tasks.get_task(task_name)
task_flow_idx = self._get_task_flow_idx(task_id)

# Get task result.
task_result = (
[item.get('result') for item in self.flow.staged[task_id]['items']]
if task_spec.has_items() else event.result
)

# Remove remaining task from staging.
self.flow.remove_staged_task(task_id)

# Set current task in the context.
in_ctx_idx = task_flow_entry['ctx']
in_ctx_val = self.flow.contexts[in_ctx_idx]['value']
current_task = {'id': task_id, 'name': task_name, 'result': event.result}
current_task = {'id': task_id, 'name': task_name, 'result': task_result}
current_ctx = ctx.set_current_task(in_ctx_val, current_task)

# Setup context for evaluating expressions in task transition criteria.
flow_ctx = {'__flow': self.flow.serialize()}
current_ctx = dx.merge_dicts(current_ctx, flow_ctx, True)

# Evaluate task transitions if task is completed and state change is not processed.
if new_task_state in states.COMPLETED_STATES and new_task_state != old_task_state:
# Identify task transitions for the current completed task.
task_transitions = self.graph.get_next_transitions(task_id)

Expand Down Expand Up @@ -593,17 +651,19 @@ def update_task_flow(self, task_id, event):
self.flow.staged[task_transition[1]] = staging_data

# If the next task is noop, then mark the task as completed.
if next_task_name == 'noop':
self.update_task_flow(next_task_id, events.TaskNoopEvent())

# If the next task is fail, then fail the workflow..
if next_task_name == 'fail':
self.update_task_flow(next_task_id, events.TaskFailEvent())
if next_task_name in events.ENGINE_EVENT_MAP.keys():
engine_event_queue.put((next_task_id, next_task_name))

# Process the task event using the workflow state machine and update the workflow state.
task_ex_event = events.TaskExecutionEvent(task_id, task_flow_entry['state'])
machines.WorkflowStateMachine.process_event(self, task_ex_event)

# Process any engine commands in the queue.
while not engine_event_queue.empty():
next_task_id, next_task_name = engine_event_queue.get()
engine_event = events.ENGINE_EVENT_MAP[next_task_name]
self.update_task_flow(next_task_id, engine_event())

# Render workflow output if workflow is completed.
if self.get_workflow_state() in states.COMPLETED_STATES:
in_ctx_idx = task_flow_entry['ctx']
Expand Down
Loading