New Applier Workflow Engine

This patch set allows to execute Watcher Actions in parallel.
New config option max_workers sets max number of threads
to work.

Implements: blueprint parallel-applier
Change-Id: Ie4f3ed7e75936b434d308aa875eaa49d49f0c613
This commit is contained in:
Alexander Chadin
2017-01-17 17:06:05 +03:00
committed by Vincent Françoise
parent 41f579d464
commit 6e09cdb5ac
4 changed files with 120 additions and 26 deletions

View File

@@ -15,10 +15,12 @@
# limitations under the License. # limitations under the License.
# #
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log from oslo_log import log
from taskflow import engines from taskflow import engines
from taskflow.patterns import graph_flow as gf from taskflow.patterns import graph_flow as gf
from taskflow import task from taskflow import task as flow_task
from watcher._i18n import _LE, _LW, _LC from watcher._i18n import _LE, _LW, _LC
from watcher.applier.workflow_engine import base from watcher.applier.workflow_engine import base
@@ -48,6 +50,18 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
# (True to allow v execution or False to not). # (True to allow v execution or False to not).
return True return True
@classmethod
def get_config_opts(cls):
return [
cfg.IntOpt(
'max_workers',
default=processutils.get_worker_count(),
min=1,
required=True,
help='Number of workers for taskflow engine '
'to execute actions.')
]
def execute(self, actions): def execute(self, actions):
try: try:
# NOTE(jed) We want to have a strong separation of concern # NOTE(jed) We want to have a strong separation of concern
@@ -56,34 +70,32 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
# We want to provide the 'taskflow' engine by # We want to provide the 'taskflow' engine by
# default although we still want to leave the possibility for # default although we still want to leave the possibility for
# the users to change it. # the users to change it.
# todo(jed) we need to change the way the actions are stored. # The current implementation uses graph with linked actions.
# The current implementation only use a linked list of actions.
# todo(jed) add olso conf for retry and name # todo(jed) add olso conf for retry and name
flow = gf.Flow("watcher_flow") flow = gf.Flow("watcher_flow")
previous = None actions_uuid = {}
for a in actions: for a in actions:
task = TaskFlowActionContainer(a, self) task = TaskFlowActionContainer(a, self)
flow.add(task) flow.add(task)
if previous is None: actions_uuid[a.uuid] = task
previous = task
# we have only one Action in the Action Plan
if len(actions) == 1:
nop = TaskFlowNop()
flow.add(nop)
flow.link(previous, nop)
else:
# decider == guard (UML)
flow.link(previous, task, decider=self.decider)
previous = task
e = engines.load(flow) for a in actions:
for parent_id in a.parents:
flow.link(actions_uuid[parent_id], actions_uuid[a.uuid],
decider=self.decider)
e = engines.load(
flow, engine='parallel',
max_workers=self.config.max_workers)
e.run() e.run()
return flow
except Exception as e: except Exception as e:
raise exception.WorkflowExecutionException(error=e) raise exception.WorkflowExecutionException(error=e)
class TaskFlowActionContainer(task.Task): class TaskFlowActionContainer(flow_task.Task):
def __init__(self, db_action, engine): def __init__(self, db_action, engine):
name = "action_type:{0} uuid:{1}".format(db_action.action_type, name = "action_type:{0} uuid:{1}".format(db_action.action_type,
db_action.uuid) db_action.uuid)
@@ -148,7 +160,7 @@ class TaskFlowActionContainer(task.Task):
LOG.critical(_LC("Oops! We need a disaster recover plan.")) LOG.critical(_LC("Oops! We need a disaster recover plan."))
class TaskFlowNop(task.Task): class TaskFlowNop(flow_task.Task):
"""This class is used in case of the workflow have only one Action. """This class is used in case of the workflow have only one Action.
We need at least two atoms to create a link. We need at least two atoms to create a link.

View File

@@ -40,7 +40,7 @@ APPLIER_MANAGER_OPTS = [
cfg.StrOpt('workflow_engine', cfg.StrOpt('workflow_engine',
default='taskflow', default='taskflow',
required=True, required=True,
help='Select the engine to use to execute the workflow') help='Select the engine to use to execute the workflow'),
] ]

View File

@@ -67,20 +67,22 @@ class DummyWithResize(base.DummyBaseStrategy):
action_type='migrate', action_type='migrate',
resource_id='b199db0c-1408-4d52-b5a5-5ca14de0ff36', resource_id='b199db0c-1408-4d52-b5a5-5ca14de0ff36',
input_parameters={ input_parameters={
'source_node': 'server1', 'source_node': 'compute2',
'destination_node': 'server2'}) 'destination_node': 'compute3',
'migration_type': 'live'})
self.solution.add_action( self.solution.add_action(
action_type='migrate', action_type='migrate',
resource_id='8db1b3c1-7938-4c34-8c03-6de14b874f8f', resource_id='8db1b3c1-7938-4c34-8c03-6de14b874f8f',
input_parameters={ input_parameters={
'source_node': 'server1', 'source_node': 'compute2',
'destination_node': 'server2'} 'destination_node': 'compute3',
'migration_type': 'live'}
) )
self.solution.add_action( self.solution.add_action(
action_type='resize', action_type='resize',
resource_id='8db1b3c1-7938-4c34-8c03-6de14b874f8f', resource_id='8db1b3c1-7938-4c34-8c03-6de14b874f8f',
input_parameters={'flavor': 'x1'} input_parameters={'flavor': 'x2'}
) )
def post_execute(self): def post_execute(self):

View File

@@ -59,6 +59,7 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
config=mock.Mock(), config=mock.Mock(),
context=self.context, context=self.context,
applier_manager=mock.MagicMock()) applier_manager=mock.MagicMock())
self.engine.config.max_workers = 2
@mock.patch('taskflow.engines.load') @mock.patch('taskflow.engines.load')
@mock.patch('taskflow.patterns.graph_flow.Flow.link') @mock.patch('taskflow.patterns.graph_flow.Flow.link')
@@ -70,9 +71,9 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
def create_action(self, action_type, parameters, parents): def create_action(self, action_type, parameters, parents, uuid=None):
action = { action = {
'uuid': utils.generate_uuid(), 'uuid': uuid or utils.generate_uuid(),
'action_plan_id': 0, 'action_plan_id': 0,
'action_type': action_type, 'action_type': action_type,
'input_parameters': parameters, 'input_parameters': parameters,
@@ -114,6 +115,85 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
except Exception as exc: except Exception as exc:
self.fail(exc) self.fail(exc)
def test_execute_nop_sleep(self):
actions = []
first_nop = self.create_action("nop", {'message': 'test'}, [])
second_nop = self.create_action("nop", {'message': 'second test'}, [])
sleep = self.create_action("sleep", {'duration': 0.0},
[first_nop.uuid, second_nop.uuid])
actions.extend([first_nop, second_nop, sleep])
try:
self.engine.execute(actions)
self.check_actions_state(actions, objects.action.State.SUCCEEDED)
except Exception as exc:
self.fail(exc)
def test_execute_with_parents(self):
actions = []
first_nop = self.create_action(
"nop", {'message': 'test'}, [],
uuid='bc7eee5c-4fbe-4def-9744-b539be55aa19')
second_nop = self.create_action(
"nop", {'message': 'second test'}, [],
uuid='0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23')
first_sleep = self.create_action(
"sleep", {'duration': 0.0}, [first_nop.uuid, second_nop.uuid],
uuid='be436531-0da3-4dad-a9c0-ea1d2aff6496')
second_sleep = self.create_action(
"sleep", {'duration': 0.0}, [first_sleep.uuid],
uuid='9eb51e14-936d-4d12-a500-6ba0f5e0bb1c')
actions.extend([first_nop, second_nop, first_sleep, second_sleep])
expected_nodes = [
{'uuid': 'bc7eee5c-4fbe-4def-9744-b539be55aa19',
'input_parameters': {u'message': u'test'},
'action_plan_id': 0, 'state': u'PENDING', 'parents': [],
'action_type': u'nop', 'id': 1},
{'uuid': '0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23',
'input_parameters': {u'message': u'second test'},
'action_plan_id': 0, 'state': u'PENDING', 'parents': [],
'action_type': u'nop', 'id': 2},
{'uuid': 'be436531-0da3-4dad-a9c0-ea1d2aff6496',
'input_parameters': {u'duration': 0.0},
'action_plan_id': 0, 'state': u'PENDING',
'parents': [u'bc7eee5c-4fbe-4def-9744-b539be55aa19',
u'0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23'],
'action_type': u'sleep', 'id': 3},
{'uuid': '9eb51e14-936d-4d12-a500-6ba0f5e0bb1c',
'input_parameters': {u'duration': 0.0},
'action_plan_id': 0, 'state': u'PENDING',
'parents': [u'be436531-0da3-4dad-a9c0-ea1d2aff6496'],
'action_type': u'sleep', 'id': 4}]
expected_edges = [
('action_type:nop uuid:0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23',
'action_type:sleep uuid:be436531-0da3-4dad-a9c0-ea1d2aff6496'),
('action_type:nop uuid:bc7eee5c-4fbe-4def-9744-b539be55aa19',
'action_type:sleep uuid:be436531-0da3-4dad-a9c0-ea1d2aff6496'),
('action_type:sleep uuid:be436531-0da3-4dad-a9c0-ea1d2aff6496',
'action_type:sleep uuid:9eb51e14-936d-4d12-a500-6ba0f5e0bb1c')]
try:
flow = self.engine.execute(actions)
actual_nodes = sorted([x[0]._db_action.as_dict()
for x in flow.iter_nodes()],
key=lambda x: x['id'])
for expected, actual in zip(expected_nodes, actual_nodes):
for key in expected.keys():
self.assertIn(expected[key], actual.values())
actual_edges = [(u.name, v.name)
for (u, v, _) in flow.iter_links()]
for edge in expected_edges:
self.assertIn(edge, actual_edges)
self.check_actions_state(actions, objects.action.State.SUCCEEDED)
except Exception as exc:
self.fail(exc)
def test_execute_with_two_actions(self): def test_execute_with_two_actions(self):
actions = [] actions = []
second = self.create_action("sleep", {'duration': 0.0}, None) second = self.create_action("sleep", {'duration': 0.0}, None)