diff --git a/releasenotes/notes/action-plan-cancel-c54726378019e096.yaml b/releasenotes/notes/action-plan-cancel-c54726378019e096.yaml new file mode 100644 index 000000000..cf4b56280 --- /dev/null +++ b/releasenotes/notes/action-plan-cancel-c54726378019e096.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Adds feature to cancel an action-plan. diff --git a/watcher/api/controllers/v1/action_plan.py b/watcher/api/controllers/v1/action_plan.py index 5a8745a59..79dd5d81a 100644 --- a/watcher/api/controllers/v1/action_plan.py +++ b/watcher/api/controllers/v1/action_plan.py @@ -488,6 +488,7 @@ class ActionPlansController(rest.RestController): raise exception.PatchError(patch=patch, reason=e) launch_action_plan = False + cancel_action_plan = False # transitions that are allowed via PATCH allowed_patch_transitions = [ @@ -496,7 +497,7 @@ class ActionPlansController(rest.RestController): (ap_objects.State.RECOMMENDED, ap_objects.State.CANCELLED), (ap_objects.State.ONGOING, - ap_objects.State.CANCELLED), + ap_objects.State.CANCELLING), (ap_objects.State.PENDING, ap_objects.State.CANCELLED), ] @@ -515,6 +516,8 @@ class ActionPlansController(rest.RestController): if action_plan.state == ap_objects.State.PENDING: launch_action_plan = True + if action_plan.state == ap_objects.State.CANCELLED: + cancel_action_plan = True # Update only the fields that have changed for field in objects.ActionPlan.fields: @@ -534,6 +537,16 @@ class ActionPlansController(rest.RestController): action_plan_to_update.save() + # NOTE: if action plan is cancelled from pending or recommended + # state update action state here only + if cancel_action_plan: + filters = {'action_plan_uuid': action_plan.uuid} + actions = objects.Action.list(pecan.request.context, + filters=filters, eager=True) + for a in actions: + a.state = objects.action.State.CANCELLED + a.save() + if launch_action_plan: applier_client = rpcapi.ApplierAPI() applier_client.launch_action_plan(pecan.request.context, diff --git a/watcher/applier/action_plan/default.py b/watcher/applier/action_plan/default.py index 2462f8454..a63221e12 100644 --- a/watcher/applier/action_plan/default.py +++ b/watcher/applier/action_plan/default.py @@ -20,6 +20,7 @@ from oslo_log import log from watcher.applier.action_plan import base from watcher.applier import default +from watcher.common import exception from watcher import notifications from watcher import objects from watcher.objects import fields @@ -39,6 +40,9 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler): try: action_plan = objects.ActionPlan.get_by_uuid( self.ctx, self.action_plan_uuid, eager=True) + if action_plan.state == objects.action_plan.State.CANCELLED: + self._update_action_from_pending_to_cancelled() + return action_plan.state = objects.action_plan.State.ONGOING action_plan.save() notifications.action_plan.send_action_notification( @@ -54,6 +58,12 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler): self.ctx, action_plan, action=fields.NotificationAction.EXECUTION, phase=fields.NotificationPhase.END) + + except exception.ActionPlanCancelled as e: + LOG.exception(e) + action_plan.state = objects.action_plan.State.CANCELLED + self._update_action_from_pending_to_cancelled() + except Exception as e: LOG.exception(e) action_plan.state = objects.action_plan.State.FAILED @@ -64,3 +74,12 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler): phase=fields.NotificationPhase.ERROR) finally: action_plan.save() + + def _update_action_from_pending_to_cancelled(self): + filters = {'action_plan_uuid': self.action_plan_uuid, + 'state': objects.action.State.PENDING} + actions = objects.Action.list(self.ctx, filters=filters, eager=True) + if actions: + for a in actions: + a.state = objects.action.State.CANCELLED + a.save() diff --git a/watcher/applier/actions/base.py b/watcher/applier/actions/base.py index 6bffa5044..5bb330bea 100644 --- a/watcher/applier/actions/base.py +++ b/watcher/applier/actions/base.py @@ -32,6 +32,9 @@ class BaseAction(loadable.Loadable): # watcher dashboard and will be nested in input_parameters RESOURCE_ID = 'resource_id' + # Add action class name to the list, if implementing abort. + ABORT_TRUE = ['Sleep', 'Nop'] + def __init__(self, config, osc=None): """Constructor @@ -134,3 +137,6 @@ class BaseAction(loadable.Loadable): def get_description(self): """Description of the action""" raise NotImplementedError() + + def check_abort(self): + return bool(self.__class__.__name__ in self.ABORT_TRUE) diff --git a/watcher/applier/actions/migration.py b/watcher/applier/actions/migration.py index 663f84f30..94ec0af2b 100644 --- a/watcher/applier/actions/migration.py +++ b/watcher/applier/actions/migration.py @@ -164,6 +164,10 @@ class Migrate(base.BaseAction): def revert(self): return self.migrate(destination=self.source_node) + def abort(self): + # TODO(adisky): implement abort for migration + LOG.warning("Abort for migration not implemented") + def pre_condition(self): # TODO(jed): check if the instance exists / check if the instance is on # the source_node diff --git a/watcher/applier/actions/nop.py b/watcher/applier/actions/nop.py index 0a4969b18..6d80520f4 100644 --- a/watcher/applier/actions/nop.py +++ b/watcher/applier/actions/nop.py @@ -23,7 +23,6 @@ import voluptuous from watcher.applier.actions import base - LOG = log.getLogger(__name__) @@ -69,3 +68,6 @@ class Nop(base.BaseAction): def get_description(self): """Description of the action""" return "Logging a NOP message" + + def abort(self): + LOG.debug("Abort action NOP") diff --git a/watcher/applier/actions/sleep.py b/watcher/applier/actions/sleep.py index f472eeddc..dc2ed3d49 100644 --- a/watcher/applier/actions/sleep.py +++ b/watcher/applier/actions/sleep.py @@ -70,3 +70,6 @@ class Sleep(base.BaseAction): def get_description(self): """Description of the action""" return "Wait for a given interval in seconds." + + def abort(self): + LOG.debug("Abort action sleep") diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index 7fff92c80..f83afcd7c 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -17,13 +17,17 @@ # import abc +import six +import time + +import eventlet from oslo_log import log -import six from taskflow import task as flow_task from watcher.applier.actions import factory from watcher.common import clients +from watcher.common import exception from watcher.common.loader import loadable from watcher import notifications from watcher import objects @@ -32,6 +36,9 @@ from watcher.objects import fields LOG = log.getLogger(__name__) +CANCEL_STATE = [objects.action_plan.State.CANCELLING, + objects.action_plan.State.CANCELLED] + @six.add_metaclass(abc.ABCMeta) class BaseWorkFlowEngine(loadable.Loadable): @@ -81,6 +88,10 @@ class BaseWorkFlowEngine(loadable.Loadable): def notify(self, action, state): db_action = objects.Action.get_by_uuid(self.context, action.uuid, eager=True) + if (db_action.state in [objects.action.State.CANCELLING, + objects.action.State.CANCELLED] and + state == objects.action.State.SUCCEEDED): + return db_action.state = state db_action.save() @@ -122,16 +133,34 @@ class BaseTaskFlowActionContainer(flow_task.Task): def do_post_execute(self): raise NotImplementedError() + @abc.abstractmethod + def do_revert(self): + raise NotImplementedError() + + @abc.abstractmethod + def do_abort(self, *args, **kwargs): + raise NotImplementedError() + # NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute, # post_execute) independently. We want to support notifications in base # class, so child's methods should be named with `do_` prefix and wrapped. def pre_execute(self): try: + # NOTE(adisky): check the state of action plan before starting + # next action, if action plan is cancelled raise the exceptions + # so that taskflow does not schedule further actions. + action_plan = objects.ActionPlan.get_by_id( + self.engine.context, self._db_action.action_plan_id) + if action_plan.state in CANCEL_STATE: + raise exception.ActionPlanCancelled(uuid=action_plan.uuid) self.do_pre_execute() notifications.action.send_execution_notification( self.engine.context, self._db_action, fields.NotificationAction.EXECUTION, fields.NotificationPhase.START) + except exception.ActionPlanCancelled as e: + LOG.exception(e) + raise except Exception as e: LOG.exception(e) self.engine.notify(self._db_action, objects.action.State.FAILED) @@ -142,22 +171,59 @@ class BaseTaskFlowActionContainer(flow_task.Task): priority=fields.NotificationPriority.ERROR) def execute(self, *args, **kwargs): + def _do_execute_action(*args, **kwargs): + try: + self.do_execute(*args, **kwargs) + notifications.action.send_execution_notification( + self.engine.context, self._db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.END) + except Exception as e: + LOG.exception(e) + LOG.error('The workflow engine has failed' + 'to execute the action: %s', self.name) + self.engine.notify(self._db_action, + objects.action.State.FAILED) + notifications.action.send_execution_notification( + self.engine.context, self._db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.ERROR, + priority=fields.NotificationPriority.ERROR) + raise + # NOTE: spawn a new thread for action execution, so that if action plan + # is cancelled workflow engine will not wait to finish action execution + et = eventlet.spawn(_do_execute_action, *args, **kwargs) + # NOTE: check for the state of action plan periodically,so that if + # action is finished or action plan is cancelled we can exit from here. + while True: + action_object = objects.Action.get_by_uuid( + self.engine.context, self._db_action.uuid, eager=True) + action_plan_object = objects.ActionPlan.get_by_id( + self.engine.context, action_object.action_plan_id) + if (action_object.state in [objects.action.State.SUCCEEDED, + objects.action.State.FAILED] or + action_plan_object.state in CANCEL_STATE): + break + time.sleep(2) try: - self.do_execute(*args, **kwargs) - notifications.action.send_execution_notification( - self.engine.context, self._db_action, - fields.NotificationAction.EXECUTION, - fields.NotificationPhase.END) + # NOTE: kill the action execution thread, if action plan is + # cancelled for all other cases wait for the result from action + # execution thread. + # Not all actions support abort operations, kill only those action + # which support abort operations + abort = self.action.check_abort() + if (action_plan_object.state in CANCEL_STATE and abort): + et.kill() + et.wait() + + # NOTE: catch the greenlet exit exception due to thread kill, + # taskflow will call revert for the action, + # we will redirect it to abort. + except eventlet.greenlet.GreenletExit: + raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid) + except Exception as e: LOG.exception(e) - LOG.error('The workflow engine has failed ' - 'to execute the action: %s', self.name) - self.engine.notify(self._db_action, objects.action.State.FAILED) - notifications.action.send_execution_notification( - self.engine.context, self._db_action, - fields.NotificationAction.EXECUTION, - fields.NotificationPhase.ERROR, - priority=fields.NotificationPriority.ERROR) raise def post_execute(self): @@ -171,3 +237,24 @@ class BaseTaskFlowActionContainer(flow_task.Task): fields.NotificationAction.EXECUTION, fields.NotificationPhase.ERROR, priority=fields.NotificationPriority.ERROR) + + def revert(self, *args, **kwargs): + action_plan = objects.ActionPlan.get_by_id( + self.engine.context, self._db_action.action_plan_id, eager=True) + # NOTE: check if revert cause by cancel action plan or + # some other exception occured during action plan execution + # if due to some other exception keep the flow intact. + if action_plan.state not in CANCEL_STATE: + self.do_revert() + action_object = objects.Action.get_by_uuid( + self.engine.context, self._db_action.uuid, eager=True) + if action_object.state == objects.action.State.ONGOING: + action_object.state = objects.action.State.CANCELLING + action_object.save() + self.abort() + if action_object.state == objects.action.State.PENDING: + action_object.state = objects.action.State.CANCELLED + action_object.save() + + def abort(self, *args, **kwargs): + self.do_abort(*args, **kwargs) diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index aa638c04c..9431ced85 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -19,6 +19,7 @@ from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log from taskflow import engines +from taskflow import exceptions as tf_exception from taskflow.patterns import graph_flow as gf from taskflow import task as flow_task @@ -90,6 +91,15 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): return flow + except exception.ActionPlanCancelled as e: + raise + + except tf_exception.WrappedFailure as e: + if e.check("watcher.common.exception.ActionPlanCancelled"): + raise exception.ActionPlanCancelled + else: + raise exception.WorkflowExecutionException(error=e) + except Exception as e: raise exception.WorkflowExecutionException(error=e) @@ -121,7 +131,7 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): LOG.debug("Post-condition action: %s", self.name) self.action.post_condition() - def revert(self, *args, **kwargs): + def do_revert(self, *args, **kwargs): LOG.warning("Revert action: %s", self.name) try: # TODO(jed): do we need to update the states in case of failure? @@ -130,6 +140,15 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): LOG.exception(e) LOG.critical("Oops! We need a disaster recover plan.") + def do_abort(self, *args, **kwargs): + LOG.warning("Aborting action: %s", self.name) + try: + self.action.abort() + self.engine.notify(self._db_action, objects.action.State.CANCELLED) + except Exception as e: + self.engine.notify(self._db_action, objects.action.State.FAILED) + LOG.exception(e) + class TaskFlowNop(flow_task.Task): """This class is used in case of the workflow have only one Action. diff --git a/watcher/common/exception.py b/watcher/common/exception.py index e3a80b638..2e08020e8 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -274,6 +274,10 @@ class ActionPlanReferenced(Invalid): "multiple actions") +class ActionPlanCancelled(WatcherException): + msg_fmt = _("Action Plan with UUID %(uuid)s is cancelled by user") + + class ActionPlanIsOngoing(Conflict): msg_fmt = _("Action Plan %(action_plan)s is currently running.") diff --git a/watcher/objects/action.py b/watcher/objects/action.py index 539d6619f..95f923a23 100644 --- a/watcher/objects/action.py +++ b/watcher/objects/action.py @@ -30,6 +30,7 @@ class State(object): SUCCEEDED = 'SUCCEEDED' DELETED = 'DELETED' CANCELLED = 'CANCELLED' + CANCELLING = 'CANCELLING' @base.WatcherObjectRegistry.register diff --git a/watcher/objects/action_plan.py b/watcher/objects/action_plan.py index b79854a46..4618ec953 100644 --- a/watcher/objects/action_plan.py +++ b/watcher/objects/action_plan.py @@ -94,6 +94,7 @@ class State(object): DELETED = 'DELETED' CANCELLED = 'CANCELLED' SUPERSEDED = 'SUPERSEDED' + CANCELLING = 'CANCELLING' @base.WatcherObjectRegistry.register diff --git a/watcher/tests/api/v1/test_actions_plans.py b/watcher/tests/api/v1/test_actions_plans.py index d04e1228d..b41743710 100644 --- a/watcher/tests/api/v1/test_actions_plans.py +++ b/watcher/tests/api/v1/test_actions_plans.py @@ -456,7 +456,7 @@ ALLOWED_TRANSITIONS = [ {"original_state": objects.action_plan.State.RECOMMENDED, "new_state": objects.action_plan.State.CANCELLED}, {"original_state": objects.action_plan.State.ONGOING, - "new_state": objects.action_plan.State.CANCELLED}, + "new_state": objects.action_plan.State.CANCELLING}, {"original_state": objects.action_plan.State.PENDING, "new_state": objects.action_plan.State.CANCELLED}, ] diff --git a/watcher/tests/applier/action_plan/test_default_action_handler.py b/watcher/tests/applier/action_plan/test_default_action_handler.py index 6949416d9..7aadee9d0 100755 --- a/watcher/tests/applier/action_plan/test_default_action_handler.py +++ b/watcher/tests/applier/action_plan/test_default_action_handler.py @@ -19,6 +19,7 @@ import mock from watcher.applier.action_plan import default from watcher.applier import default as ap_applier +from watcher.common import exception from watcher import notifications from watcher import objects from watcher.objects import action_plan as ap_objects @@ -99,3 +100,27 @@ class TestDefaultActionPlanHandler(base.DbTestCase): self.m_action_plan_notifications .send_action_notification .call_args_list) + + @mock.patch.object(objects.ActionPlan, "get_by_uuid") + def test_cancel_action_plan(self, m_get_action_plan): + m_get_action_plan.return_value = self.action_plan + self.action_plan.state = ap_objects.State.CANCELLED + self.action_plan.save() + command = default.DefaultActionPlanHandler( + self.context, mock.MagicMock(), self.action_plan.uuid) + command.execute() + action = self.action.get_by_uuid(self.context, self.action.uuid) + self.assertEqual(ap_objects.State.CANCELLED, self.action_plan.state) + self.assertEqual(objects.action.State.CANCELLED, action.state) + + @mock.patch.object(ap_applier.DefaultApplier, "execute") + @mock.patch.object(objects.ActionPlan, "get_by_uuid") + def test_cancel_action_plan_with_exception(self, m_get_action_plan, + m_execute): + m_get_action_plan.return_value = self.action_plan + m_execute.side_effect = exception.ActionPlanCancelled( + self.action_plan.uuid) + command = default.DefaultActionPlanHandler( + self.context, mock.MagicMock(), self.action_plan.uuid) + command.execute() + self.assertEqual(ap_objects.State.CANCELLED, self.action_plan.state) diff --git a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py index 5452db890..5e6b4a4ba 100644 --- a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py +++ b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py @@ -29,6 +29,7 @@ from watcher.common import utils from watcher import notifications from watcher import objects from watcher.tests.db import base +from watcher.tests.objects import utils as obj_utils class ExpectedException(Exception): @@ -75,7 +76,8 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def create_action(self, action_type, parameters, parents=None, uuid=None): + def create_action(self, action_type, parameters, parents=None, uuid=None, + state=None): action = { 'uuid': uuid or utils.generate_uuid(), 'action_plan_id': 0, @@ -88,7 +90,6 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): new_action = objects.Action(self.context, **action) with mock.patch.object(notifications.action, 'send_create'): new_action.create() - return new_action def check_action_state(self, action, expected_state): @@ -110,10 +111,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_with_one_action(self, mock_send_update, - mock_execution_notification): + mock_execution_notification, + m_get_actionplan): + m_get_actionplan.return_value = obj_utils.get_test_action_plan( + self.context, id=0) actions = [self.create_action("nop", {'message': 'test'})] try: self.engine.execute(actions) @@ -122,10 +127,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_nop_sleep(self, mock_send_update, - mock_execution_notification): + mock_execution_notification, + m_get_actionplan): + m_get_actionplan.return_value = obj_utils.get_test_action_plan( + self.context, id=0) actions = [] first_nop = self.create_action("nop", {'message': 'test'}) second_nop = self.create_action("nop", {'message': 'second test'}) @@ -140,10 +149,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') def test_execute_with_parents(self, mock_send_update, - mock_execution_notification): + mock_execution_notification, + m_get_actionplan): + m_get_actionplan.return_value = obj_utils.get_test_action_plan( + self.context, id=0) actions = [] first_nop = self.create_action( "nop", {'message': 'test'}, @@ -208,9 +221,13 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') - def test_execute_with_two_actions(self, m_send_update, m_execution): + def test_execute_with_two_actions(self, m_send_update, m_execution, + m_get_actionplan): + m_get_actionplan.return_value = obj_utils.get_test_action_plan( + self.context, id=0) actions = [] second = self.create_action("sleep", {'duration': 0.0}) first = self.create_action("nop", {'message': 'test'}) @@ -225,11 +242,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') - def test_execute_with_three_actions(self, m_send_update, m_execution): + def test_execute_with_three_actions(self, m_send_update, m_execution, + m_get_actionplan): + m_get_actionplan.return_value = obj_utils.get_test_action_plan( + self.context, id=0) actions = [] - third = self.create_action("nop", {'message': 'next'}) second = self.create_action("sleep", {'duration': 0.0}) first = self.create_action("nop", {'message': 'hello'}) @@ -249,9 +269,13 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') - def test_execute_with_exception(self, m_send_update, m_execution): + def test_execute_with_exception(self, m_send_update, m_execution, + m_get_actionplan): + m_get_actionplan.return_value = obj_utils.get_test_action_plan( + self.context, id=0) actions = [] third = self.create_action("no_exist", {'message': 'next'}) @@ -273,11 +297,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): self.check_action_state(second, objects.action.State.SUCCEEDED) self.check_action_state(third, objects.action.State.FAILED) + @mock.patch.object(objects.ActionPlan, "get_by_id") @mock.patch.object(notifications.action, 'send_execution_notification') @mock.patch.object(notifications.action, 'send_update') @mock.patch.object(factory.ActionFactory, "make_action") def test_execute_with_action_exception(self, m_make_action, m_send_update, - m_send_execution): + m_send_execution, m_get_actionplan): + m_get_actionplan.return_value = obj_utils.get_test_action_plan( + self.context, id=0) actions = [self.create_action("fake_action", {})] m_make_action.return_value = FakeAction(mock.Mock()) @@ -286,3 +313,43 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): self.assertIsInstance(exc.kwargs['error'], ExpectedException) self.check_action_state(actions[0], objects.action.State.FAILED) + + @mock.patch.object(objects.ActionPlan, "get_by_uuid") + def test_execute_with_action_plan_cancel(self, m_get_actionplan): + obj_utils.create_test_goal(self.context) + strategy = obj_utils.create_test_strategy(self.context) + audit = obj_utils.create_test_audit( + self.context, strategy_id=strategy.id) + action_plan = obj_utils.create_test_action_plan( + self.context, audit_id=audit.id, + strategy_id=strategy.id, + state=objects.action_plan.State.CANCELLING) + action1 = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + action_type='nop', state=objects.action.State.SUCCEEDED, + input_parameters={'message': 'hello World'}) + action2 = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + action_type='nop', state=objects.action.State.ONGOING, + uuid='9eb51e14-936d-4d12-a500-6ba0f5e0bb1c', + input_parameters={'message': 'hello World'}) + action3 = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + action_type='nop', state=objects.action.State.PENDING, + uuid='bc7eee5c-4fbe-4def-9744-b539be55aa19', + input_parameters={'message': 'hello World'}) + m_get_actionplan.return_value = action_plan + actions = [] + actions.append(action1) + actions.append(action2) + actions.append(action3) + + self.assertRaises(exception.ActionPlanCancelled, + self.engine.execute, actions) + try: + self.check_action_state(action1, objects.action.State.SUCCEEDED) + self.check_action_state(action2, objects.action.State.CANCELLED) + self.check_action_state(action3, objects.action.State.CANCELLED) + + except Exception as exc: + self.fail(exc) diff --git a/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py b/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py new file mode 100644 index 000000000..c05d47130 --- /dev/null +++ b/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py @@ -0,0 +1,79 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Authors: Jean-Emile DARTOIS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import eventlet +import mock + +from watcher.applier.workflow_engine import default as tflow +from watcher import objects +from watcher.tests.db import base +from watcher.tests.objects import utils as obj_utils + + +class TestTaskFlowActionContainer(base.DbTestCase): + def setUp(self): + super(TestTaskFlowActionContainer, self).setUp() + self.engine = tflow.DefaultWorkFlowEngine( + config=mock.Mock(), + context=self.context, + applier_manager=mock.MagicMock()) + obj_utils.create_test_goal(self.context) + self.strategy = obj_utils.create_test_strategy(self.context) + self.audit = obj_utils.create_test_audit( + self.context, strategy_id=self.strategy.id) + + def test_execute(self): + action_plan = obj_utils.create_test_action_plan( + self.context, audit_id=self.audit.id, + strategy_id=self.strategy.id, + state=objects.action.State.ONGOING) + + action = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + state=objects.action.State.ONGOING, + action_type='nop', + input_parameters={'message': 'hello World'}) + action_container = tflow.TaskFlowActionContainer( + db_action=action, + engine=self.engine) + action_container.execute() + + self.assertTrue(action.state, objects.action.State.SUCCEEDED) + + @mock.patch('eventlet.spawn') + def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn): + action_plan = obj_utils.create_test_action_plan( + self.context, audit_id=self.audit.id, + strategy_id=self.strategy.id, + state=objects.action_plan.State.CANCELLING) + + action = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + state=objects.action.State.ONGOING, + action_type='nop', + input_parameters={'message': 'hello World'}) + action_container = tflow.TaskFlowActionContainer( + db_action=action, + engine=self.engine) + + def empty_test(): + pass + et = eventlet.spawn(empty_test) + mock_eventlet_spawn.return_value = et + action_container.execute() + et.kill.assert_called_with()