diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index bebe4bd52..21c932010 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -15,10 +15,12 @@ # limitations under the License. # +from oslo_concurrency import processutils +from oslo_config import cfg from oslo_log import log from taskflow import engines from taskflow.patterns import graph_flow as gf -from taskflow import task +from taskflow import task as flow_task from watcher._i18n import _LE, _LW, _LC from watcher.applier.workflow_engine import base @@ -48,6 +50,18 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): # (True to allow v execution or False to not). return True + @classmethod + def get_config_opts(cls): + return [ + cfg.IntOpt( + 'max_workers', + default=processutils.get_worker_count(), + min=1, + required=True, + help='Number of workers for taskflow engine ' + 'to execute actions.') + ] + def execute(self, actions): try: # NOTE(jed) We want to have a strong separation of concern @@ -56,34 +70,32 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): # We want to provide the 'taskflow' engine by # default although we still want to leave the possibility for # the users to change it. - # todo(jed) we need to change the way the actions are stored. - # The current implementation only use a linked list of actions. + # The current implementation uses graph with linked actions. # todo(jed) add olso conf for retry and name flow = gf.Flow("watcher_flow") - previous = None + actions_uuid = {} for a in actions: task = TaskFlowActionContainer(a, self) flow.add(task) - if previous is None: - previous = task - # we have only one Action in the Action Plan - if len(actions) == 1: - nop = TaskFlowNop() - flow.add(nop) - flow.link(previous, nop) - else: - # decider == guard (UML) - flow.link(previous, task, decider=self.decider) - previous = task + actions_uuid[a.uuid] = task - e = engines.load(flow) + for a in actions: + for parent_id in a.parents: + flow.link(actions_uuid[parent_id], actions_uuid[a.uuid], + decider=self.decider) + + e = engines.load( + flow, engine='parallel', + max_workers=self.config.max_workers) e.run() + return flow + except Exception as e: raise exception.WorkflowExecutionException(error=e) -class TaskFlowActionContainer(task.Task): +class TaskFlowActionContainer(flow_task.Task): def __init__(self, db_action, engine): name = "action_type:{0} uuid:{1}".format(db_action.action_type, db_action.uuid) @@ -148,7 +160,7 @@ class TaskFlowActionContainer(task.Task): LOG.critical(_LC("Oops! We need a disaster recover plan.")) -class TaskFlowNop(task.Task): +class TaskFlowNop(flow_task.Task): """This class is used in case of the workflow have only one Action. We need at least two atoms to create a link. diff --git a/watcher/conf/applier.py b/watcher/conf/applier.py index 70f7d65c0..ec1bf3860 100644 --- a/watcher/conf/applier.py +++ b/watcher/conf/applier.py @@ -40,7 +40,7 @@ APPLIER_MANAGER_OPTS = [ cfg.StrOpt('workflow_engine', default='taskflow', required=True, - help='Select the engine to use to execute the workflow') + help='Select the engine to use to execute the workflow'), ] diff --git a/watcher/decision_engine/strategy/strategies/dummy_with_resize.py b/watcher/decision_engine/strategy/strategies/dummy_with_resize.py index c35b2a0da..1c4c27ccb 100644 --- a/watcher/decision_engine/strategy/strategies/dummy_with_resize.py +++ b/watcher/decision_engine/strategy/strategies/dummy_with_resize.py @@ -67,20 +67,22 @@ class DummyWithResize(base.DummyBaseStrategy): action_type='migrate', resource_id='b199db0c-1408-4d52-b5a5-5ca14de0ff36', input_parameters={ - 'source_node': 'server1', - 'destination_node': 'server2'}) + 'source_node': 'compute2', + 'destination_node': 'compute3', + 'migration_type': 'live'}) self.solution.add_action( action_type='migrate', resource_id='8db1b3c1-7938-4c34-8c03-6de14b874f8f', input_parameters={ - 'source_node': 'server1', - 'destination_node': 'server2'} + 'source_node': 'compute2', + 'destination_node': 'compute3', + 'migration_type': 'live'} ) self.solution.add_action( action_type='resize', resource_id='8db1b3c1-7938-4c34-8c03-6de14b874f8f', - input_parameters={'flavor': 'x1'} + input_parameters={'flavor': 'x2'} ) def post_execute(self): diff --git a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py index 331c0a31c..1847f3b44 100644 --- a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py +++ b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py @@ -59,6 +59,7 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): config=mock.Mock(), context=self.context, applier_manager=mock.MagicMock()) + self.engine.config.max_workers = 2 @mock.patch('taskflow.engines.load') @mock.patch('taskflow.patterns.graph_flow.Flow.link') @@ -70,9 +71,9 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def create_action(self, action_type, parameters, parents): + def create_action(self, action_type, parameters, parents, uuid=None): action = { - 'uuid': utils.generate_uuid(), + 'uuid': uuid or utils.generate_uuid(), 'action_plan_id': 0, 'action_type': action_type, 'input_parameters': parameters, @@ -114,6 +115,85 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + def test_execute_nop_sleep(self): + actions = [] + first_nop = self.create_action("nop", {'message': 'test'}, []) + second_nop = self.create_action("nop", {'message': 'second test'}, []) + sleep = self.create_action("sleep", {'duration': 0.0}, + [first_nop.uuid, second_nop.uuid]) + actions.extend([first_nop, second_nop, sleep]) + + try: + self.engine.execute(actions) + self.check_actions_state(actions, objects.action.State.SUCCEEDED) + + except Exception as exc: + self.fail(exc) + + def test_execute_with_parents(self): + actions = [] + first_nop = self.create_action( + "nop", {'message': 'test'}, [], + uuid='bc7eee5c-4fbe-4def-9744-b539be55aa19') + second_nop = self.create_action( + "nop", {'message': 'second test'}, [], + uuid='0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23') + first_sleep = self.create_action( + "sleep", {'duration': 0.0}, [first_nop.uuid, second_nop.uuid], + uuid='be436531-0da3-4dad-a9c0-ea1d2aff6496') + second_sleep = self.create_action( + "sleep", {'duration': 0.0}, [first_sleep.uuid], + uuid='9eb51e14-936d-4d12-a500-6ba0f5e0bb1c') + actions.extend([first_nop, second_nop, first_sleep, second_sleep]) + + expected_nodes = [ + {'uuid': 'bc7eee5c-4fbe-4def-9744-b539be55aa19', + 'input_parameters': {u'message': u'test'}, + 'action_plan_id': 0, 'state': u'PENDING', 'parents': [], + 'action_type': u'nop', 'id': 1}, + {'uuid': '0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23', + 'input_parameters': {u'message': u'second test'}, + 'action_plan_id': 0, 'state': u'PENDING', 'parents': [], + 'action_type': u'nop', 'id': 2}, + {'uuid': 'be436531-0da3-4dad-a9c0-ea1d2aff6496', + 'input_parameters': {u'duration': 0.0}, + 'action_plan_id': 0, 'state': u'PENDING', + 'parents': [u'bc7eee5c-4fbe-4def-9744-b539be55aa19', + u'0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23'], + 'action_type': u'sleep', 'id': 3}, + {'uuid': '9eb51e14-936d-4d12-a500-6ba0f5e0bb1c', + 'input_parameters': {u'duration': 0.0}, + 'action_plan_id': 0, 'state': u'PENDING', + 'parents': [u'be436531-0da3-4dad-a9c0-ea1d2aff6496'], + 'action_type': u'sleep', 'id': 4}] + + expected_edges = [ + ('action_type:nop uuid:0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23', + 'action_type:sleep uuid:be436531-0da3-4dad-a9c0-ea1d2aff6496'), + ('action_type:nop uuid:bc7eee5c-4fbe-4def-9744-b539be55aa19', + 'action_type:sleep uuid:be436531-0da3-4dad-a9c0-ea1d2aff6496'), + ('action_type:sleep uuid:be436531-0da3-4dad-a9c0-ea1d2aff6496', + 'action_type:sleep uuid:9eb51e14-936d-4d12-a500-6ba0f5e0bb1c')] + + try: + flow = self.engine.execute(actions) + actual_nodes = sorted([x[0]._db_action.as_dict() + for x in flow.iter_nodes()], + key=lambda x: x['id']) + for expected, actual in zip(expected_nodes, actual_nodes): + for key in expected.keys(): + self.assertIn(expected[key], actual.values()) + actual_edges = [(u.name, v.name) + for (u, v, _) in flow.iter_links()] + + for edge in expected_edges: + self.assertIn(edge, actual_edges) + + self.check_actions_state(actions, objects.action.State.SUCCEEDED) + + except Exception as exc: + self.fail(exc) + def test_execute_with_two_actions(self): actions = [] second = self.create_action("sleep", {'duration': 0.0}, None)