Merge "Cancel Action Plan"
This commit is contained in:
@@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Adds feature to cancel an action-plan.
|
||||
@@ -488,6 +488,7 @@ class ActionPlansController(rest.RestController):
|
||||
raise exception.PatchError(patch=patch, reason=e)
|
||||
|
||||
launch_action_plan = False
|
||||
cancel_action_plan = False
|
||||
|
||||
# transitions that are allowed via PATCH
|
||||
allowed_patch_transitions = [
|
||||
@@ -496,7 +497,7 @@ class ActionPlansController(rest.RestController):
|
||||
(ap_objects.State.RECOMMENDED,
|
||||
ap_objects.State.CANCELLED),
|
||||
(ap_objects.State.ONGOING,
|
||||
ap_objects.State.CANCELLED),
|
||||
ap_objects.State.CANCELLING),
|
||||
(ap_objects.State.PENDING,
|
||||
ap_objects.State.CANCELLED),
|
||||
]
|
||||
@@ -515,6 +516,8 @@ class ActionPlansController(rest.RestController):
|
||||
|
||||
if action_plan.state == ap_objects.State.PENDING:
|
||||
launch_action_plan = True
|
||||
if action_plan.state == ap_objects.State.CANCELLED:
|
||||
cancel_action_plan = True
|
||||
|
||||
# Update only the fields that have changed
|
||||
for field in objects.ActionPlan.fields:
|
||||
@@ -534,6 +537,16 @@ class ActionPlansController(rest.RestController):
|
||||
|
||||
action_plan_to_update.save()
|
||||
|
||||
# NOTE: if action plan is cancelled from pending or recommended
|
||||
# state update action state here only
|
||||
if cancel_action_plan:
|
||||
filters = {'action_plan_uuid': action_plan.uuid}
|
||||
actions = objects.Action.list(pecan.request.context,
|
||||
filters=filters, eager=True)
|
||||
for a in actions:
|
||||
a.state = objects.action.State.CANCELLED
|
||||
a.save()
|
||||
|
||||
if launch_action_plan:
|
||||
applier_client = rpcapi.ApplierAPI()
|
||||
applier_client.launch_action_plan(pecan.request.context,
|
||||
|
||||
@@ -20,6 +20,7 @@ from oslo_log import log
|
||||
|
||||
from watcher.applier.action_plan import base
|
||||
from watcher.applier import default
|
||||
from watcher.common import exception
|
||||
from watcher import notifications
|
||||
from watcher import objects
|
||||
from watcher.objects import fields
|
||||
@@ -39,6 +40,9 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
||||
try:
|
||||
action_plan = objects.ActionPlan.get_by_uuid(
|
||||
self.ctx, self.action_plan_uuid, eager=True)
|
||||
if action_plan.state == objects.action_plan.State.CANCELLED:
|
||||
self._update_action_from_pending_to_cancelled()
|
||||
return
|
||||
action_plan.state = objects.action_plan.State.ONGOING
|
||||
action_plan.save()
|
||||
notifications.action_plan.send_action_notification(
|
||||
@@ -54,6 +58,12 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
||||
self.ctx, action_plan,
|
||||
action=fields.NotificationAction.EXECUTION,
|
||||
phase=fields.NotificationPhase.END)
|
||||
|
||||
except exception.ActionPlanCancelled as e:
|
||||
LOG.exception(e)
|
||||
action_plan.state = objects.action_plan.State.CANCELLED
|
||||
self._update_action_from_pending_to_cancelled()
|
||||
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
action_plan.state = objects.action_plan.State.FAILED
|
||||
@@ -64,3 +74,12 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
||||
phase=fields.NotificationPhase.ERROR)
|
||||
finally:
|
||||
action_plan.save()
|
||||
|
||||
def _update_action_from_pending_to_cancelled(self):
|
||||
filters = {'action_plan_uuid': self.action_plan_uuid,
|
||||
'state': objects.action.State.PENDING}
|
||||
actions = objects.Action.list(self.ctx, filters=filters, eager=True)
|
||||
if actions:
|
||||
for a in actions:
|
||||
a.state = objects.action.State.CANCELLED
|
||||
a.save()
|
||||
|
||||
@@ -32,6 +32,9 @@ class BaseAction(loadable.Loadable):
|
||||
# watcher dashboard and will be nested in input_parameters
|
||||
RESOURCE_ID = 'resource_id'
|
||||
|
||||
# Add action class name to the list, if implementing abort.
|
||||
ABORT_TRUE = ['Sleep', 'Nop']
|
||||
|
||||
def __init__(self, config, osc=None):
|
||||
"""Constructor
|
||||
|
||||
@@ -134,3 +137,6 @@ class BaseAction(loadable.Loadable):
|
||||
def get_description(self):
|
||||
"""Description of the action"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def check_abort(self):
|
||||
return bool(self.__class__.__name__ in self.ABORT_TRUE)
|
||||
|
||||
@@ -164,6 +164,10 @@ class Migrate(base.BaseAction):
|
||||
def revert(self):
|
||||
return self.migrate(destination=self.source_node)
|
||||
|
||||
def abort(self):
|
||||
# TODO(adisky): implement abort for migration
|
||||
LOG.warning("Abort for migration not implemented")
|
||||
|
||||
def pre_condition(self):
|
||||
# TODO(jed): check if the instance exists / check if the instance is on
|
||||
# the source_node
|
||||
|
||||
@@ -23,7 +23,6 @@ import voluptuous
|
||||
|
||||
from watcher.applier.actions import base
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -69,3 +68,6 @@ class Nop(base.BaseAction):
|
||||
def get_description(self):
|
||||
"""Description of the action"""
|
||||
return "Logging a NOP message"
|
||||
|
||||
def abort(self):
|
||||
LOG.debug("Abort action NOP")
|
||||
|
||||
@@ -70,3 +70,6 @@ class Sleep(base.BaseAction):
|
||||
def get_description(self):
|
||||
"""Description of the action"""
|
||||
return "Wait for a given interval in seconds."
|
||||
|
||||
def abort(self):
|
||||
LOG.debug("Abort action sleep")
|
||||
|
||||
@@ -17,13 +17,17 @@
|
||||
#
|
||||
|
||||
import abc
|
||||
import six
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
|
||||
from oslo_log import log
|
||||
import six
|
||||
from taskflow import task as flow_task
|
||||
|
||||
from watcher.applier.actions import factory
|
||||
from watcher.common import clients
|
||||
from watcher.common import exception
|
||||
from watcher.common.loader import loadable
|
||||
from watcher import notifications
|
||||
from watcher import objects
|
||||
@@ -32,6 +36,9 @@ from watcher.objects import fields
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
CANCEL_STATE = [objects.action_plan.State.CANCELLING,
|
||||
objects.action_plan.State.CANCELLED]
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseWorkFlowEngine(loadable.Loadable):
|
||||
@@ -81,6 +88,10 @@ class BaseWorkFlowEngine(loadable.Loadable):
|
||||
def notify(self, action, state):
|
||||
db_action = objects.Action.get_by_uuid(self.context, action.uuid,
|
||||
eager=True)
|
||||
if (db_action.state in [objects.action.State.CANCELLING,
|
||||
objects.action.State.CANCELLED] and
|
||||
state == objects.action.State.SUCCEEDED):
|
||||
return
|
||||
db_action.state = state
|
||||
db_action.save()
|
||||
|
||||
@@ -122,16 +133,34 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
||||
def do_post_execute(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def do_revert(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def do_abort(self, *args, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
# NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute,
|
||||
# post_execute) independently. We want to support notifications in base
|
||||
# class, so child's methods should be named with `do_` prefix and wrapped.
|
||||
def pre_execute(self):
|
||||
try:
|
||||
# NOTE(adisky): check the state of action plan before starting
|
||||
# next action, if action plan is cancelled raise the exceptions
|
||||
# so that taskflow does not schedule further actions.
|
||||
action_plan = objects.ActionPlan.get_by_id(
|
||||
self.engine.context, self._db_action.action_plan_id)
|
||||
if action_plan.state in CANCEL_STATE:
|
||||
raise exception.ActionPlanCancelled(uuid=action_plan.uuid)
|
||||
self.do_pre_execute()
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, self._db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.START)
|
||||
except exception.ActionPlanCancelled as e:
|
||||
LOG.exception(e)
|
||||
raise
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
self.engine.notify(self._db_action, objects.action.State.FAILED)
|
||||
@@ -142,22 +171,59 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
||||
priority=fields.NotificationPriority.ERROR)
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
def _do_execute_action(*args, **kwargs):
|
||||
try:
|
||||
self.do_execute(*args, **kwargs)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, self._db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.END)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.error('The workflow engine has failed'
|
||||
'to execute the action: %s', self.name)
|
||||
self.engine.notify(self._db_action,
|
||||
objects.action.State.FAILED)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, self._db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.ERROR,
|
||||
priority=fields.NotificationPriority.ERROR)
|
||||
raise
|
||||
# NOTE: spawn a new thread for action execution, so that if action plan
|
||||
# is cancelled workflow engine will not wait to finish action execution
|
||||
et = eventlet.spawn(_do_execute_action, *args, **kwargs)
|
||||
# NOTE: check for the state of action plan periodically,so that if
|
||||
# action is finished or action plan is cancelled we can exit from here.
|
||||
while True:
|
||||
action_object = objects.Action.get_by_uuid(
|
||||
self.engine.context, self._db_action.uuid, eager=True)
|
||||
action_plan_object = objects.ActionPlan.get_by_id(
|
||||
self.engine.context, action_object.action_plan_id)
|
||||
if (action_object.state in [objects.action.State.SUCCEEDED,
|
||||
objects.action.State.FAILED] or
|
||||
action_plan_object.state in CANCEL_STATE):
|
||||
break
|
||||
time.sleep(2)
|
||||
try:
|
||||
self.do_execute(*args, **kwargs)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, self._db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.END)
|
||||
# NOTE: kill the action execution thread, if action plan is
|
||||
# cancelled for all other cases wait for the result from action
|
||||
# execution thread.
|
||||
# Not all actions support abort operations, kill only those action
|
||||
# which support abort operations
|
||||
abort = self.action.check_abort()
|
||||
if (action_plan_object.state in CANCEL_STATE and abort):
|
||||
et.kill()
|
||||
et.wait()
|
||||
|
||||
# NOTE: catch the greenlet exit exception due to thread kill,
|
||||
# taskflow will call revert for the action,
|
||||
# we will redirect it to abort.
|
||||
except eventlet.greenlet.GreenletExit:
|
||||
raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid)
|
||||
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.error('The workflow engine has failed '
|
||||
'to execute the action: %s', self.name)
|
||||
self.engine.notify(self._db_action, objects.action.State.FAILED)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, self._db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.ERROR,
|
||||
priority=fields.NotificationPriority.ERROR)
|
||||
raise
|
||||
|
||||
def post_execute(self):
|
||||
@@ -171,3 +237,24 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.ERROR,
|
||||
priority=fields.NotificationPriority.ERROR)
|
||||
|
||||
def revert(self, *args, **kwargs):
|
||||
action_plan = objects.ActionPlan.get_by_id(
|
||||
self.engine.context, self._db_action.action_plan_id, eager=True)
|
||||
# NOTE: check if revert cause by cancel action plan or
|
||||
# some other exception occured during action plan execution
|
||||
# if due to some other exception keep the flow intact.
|
||||
if action_plan.state not in CANCEL_STATE:
|
||||
self.do_revert()
|
||||
action_object = objects.Action.get_by_uuid(
|
||||
self.engine.context, self._db_action.uuid, eager=True)
|
||||
if action_object.state == objects.action.State.ONGOING:
|
||||
action_object.state = objects.action.State.CANCELLING
|
||||
action_object.save()
|
||||
self.abort()
|
||||
if action_object.state == objects.action.State.PENDING:
|
||||
action_object.state = objects.action.State.CANCELLED
|
||||
action_object.save()
|
||||
|
||||
def abort(self, *args, **kwargs):
|
||||
self.do_abort(*args, **kwargs)
|
||||
|
||||
@@ -19,6 +19,7 @@ from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from taskflow import engines
|
||||
from taskflow import exceptions as tf_exception
|
||||
from taskflow.patterns import graph_flow as gf
|
||||
from taskflow import task as flow_task
|
||||
|
||||
@@ -90,6 +91,15 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
|
||||
|
||||
return flow
|
||||
|
||||
except exception.ActionPlanCancelled as e:
|
||||
raise
|
||||
|
||||
except tf_exception.WrappedFailure as e:
|
||||
if e.check("watcher.common.exception.ActionPlanCancelled"):
|
||||
raise exception.ActionPlanCancelled
|
||||
else:
|
||||
raise exception.WorkflowExecutionException(error=e)
|
||||
|
||||
except Exception as e:
|
||||
raise exception.WorkflowExecutionException(error=e)
|
||||
|
||||
@@ -121,7 +131,7 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer):
|
||||
LOG.debug("Post-condition action: %s", self.name)
|
||||
self.action.post_condition()
|
||||
|
||||
def revert(self, *args, **kwargs):
|
||||
def do_revert(self, *args, **kwargs):
|
||||
LOG.warning("Revert action: %s", self.name)
|
||||
try:
|
||||
# TODO(jed): do we need to update the states in case of failure?
|
||||
@@ -130,6 +140,15 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer):
|
||||
LOG.exception(e)
|
||||
LOG.critical("Oops! We need a disaster recover plan.")
|
||||
|
||||
def do_abort(self, *args, **kwargs):
|
||||
LOG.warning("Aborting action: %s", self.name)
|
||||
try:
|
||||
self.action.abort()
|
||||
self.engine.notify(self._db_action, objects.action.State.CANCELLED)
|
||||
except Exception as e:
|
||||
self.engine.notify(self._db_action, objects.action.State.FAILED)
|
||||
LOG.exception(e)
|
||||
|
||||
|
||||
class TaskFlowNop(flow_task.Task):
|
||||
"""This class is used in case of the workflow have only one Action.
|
||||
|
||||
@@ -274,6 +274,10 @@ class ActionPlanReferenced(Invalid):
|
||||
"multiple actions")
|
||||
|
||||
|
||||
class ActionPlanCancelled(WatcherException):
|
||||
msg_fmt = _("Action Plan with UUID %(uuid)s is cancelled by user")
|
||||
|
||||
|
||||
class ActionPlanIsOngoing(Conflict):
|
||||
msg_fmt = _("Action Plan %(action_plan)s is currently running.")
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ class State(object):
|
||||
SUCCEEDED = 'SUCCEEDED'
|
||||
DELETED = 'DELETED'
|
||||
CANCELLED = 'CANCELLED'
|
||||
CANCELLING = 'CANCELLING'
|
||||
|
||||
|
||||
@base.WatcherObjectRegistry.register
|
||||
|
||||
@@ -94,6 +94,7 @@ class State(object):
|
||||
DELETED = 'DELETED'
|
||||
CANCELLED = 'CANCELLED'
|
||||
SUPERSEDED = 'SUPERSEDED'
|
||||
CANCELLING = 'CANCELLING'
|
||||
|
||||
|
||||
@base.WatcherObjectRegistry.register
|
||||
|
||||
@@ -456,7 +456,7 @@ ALLOWED_TRANSITIONS = [
|
||||
{"original_state": objects.action_plan.State.RECOMMENDED,
|
||||
"new_state": objects.action_plan.State.CANCELLED},
|
||||
{"original_state": objects.action_plan.State.ONGOING,
|
||||
"new_state": objects.action_plan.State.CANCELLED},
|
||||
"new_state": objects.action_plan.State.CANCELLING},
|
||||
{"original_state": objects.action_plan.State.PENDING,
|
||||
"new_state": objects.action_plan.State.CANCELLED},
|
||||
]
|
||||
|
||||
@@ -19,6 +19,7 @@ import mock
|
||||
|
||||
from watcher.applier.action_plan import default
|
||||
from watcher.applier import default as ap_applier
|
||||
from watcher.common import exception
|
||||
from watcher import notifications
|
||||
from watcher import objects
|
||||
from watcher.objects import action_plan as ap_objects
|
||||
@@ -99,3 +100,27 @@ class TestDefaultActionPlanHandler(base.DbTestCase):
|
||||
self.m_action_plan_notifications
|
||||
.send_action_notification
|
||||
.call_args_list)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||
def test_cancel_action_plan(self, m_get_action_plan):
|
||||
m_get_action_plan.return_value = self.action_plan
|
||||
self.action_plan.state = ap_objects.State.CANCELLED
|
||||
self.action_plan.save()
|
||||
command = default.DefaultActionPlanHandler(
|
||||
self.context, mock.MagicMock(), self.action_plan.uuid)
|
||||
command.execute()
|
||||
action = self.action.get_by_uuid(self.context, self.action.uuid)
|
||||
self.assertEqual(ap_objects.State.CANCELLED, self.action_plan.state)
|
||||
self.assertEqual(objects.action.State.CANCELLED, action.state)
|
||||
|
||||
@mock.patch.object(ap_applier.DefaultApplier, "execute")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||
def test_cancel_action_plan_with_exception(self, m_get_action_plan,
|
||||
m_execute):
|
||||
m_get_action_plan.return_value = self.action_plan
|
||||
m_execute.side_effect = exception.ActionPlanCancelled(
|
||||
self.action_plan.uuid)
|
||||
command = default.DefaultActionPlanHandler(
|
||||
self.context, mock.MagicMock(), self.action_plan.uuid)
|
||||
command.execute()
|
||||
self.assertEqual(ap_objects.State.CANCELLED, self.action_plan.state)
|
||||
|
||||
@@ -29,6 +29,7 @@ from watcher.common import utils
|
||||
from watcher import notifications
|
||||
from watcher import objects
|
||||
from watcher.tests.db import base
|
||||
from watcher.tests.objects import utils as obj_utils
|
||||
|
||||
|
||||
class ExpectedException(Exception):
|
||||
@@ -75,7 +76,8 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
def create_action(self, action_type, parameters, parents=None, uuid=None):
|
||||
def create_action(self, action_type, parameters, parents=None, uuid=None,
|
||||
state=None):
|
||||
action = {
|
||||
'uuid': uuid or utils.generate_uuid(),
|
||||
'action_plan_id': 0,
|
||||
@@ -88,7 +90,6 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
new_action = objects.Action(self.context, **action)
|
||||
with mock.patch.object(notifications.action, 'send_create'):
|
||||
new_action.create()
|
||||
|
||||
return new_action
|
||||
|
||||
def check_action_state(self, action, expected_state):
|
||||
@@ -110,10 +111,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_one_action(self, mock_send_update,
|
||||
mock_execution_notification):
|
||||
mock_execution_notification,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
actions = [self.create_action("nop", {'message': 'test'})]
|
||||
try:
|
||||
self.engine.execute(actions)
|
||||
@@ -122,10 +127,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_nop_sleep(self, mock_send_update,
|
||||
mock_execution_notification):
|
||||
mock_execution_notification,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
actions = []
|
||||
first_nop = self.create_action("nop", {'message': 'test'})
|
||||
second_nop = self.create_action("nop", {'message': 'second test'})
|
||||
@@ -140,10 +149,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_parents(self, mock_send_update,
|
||||
mock_execution_notification):
|
||||
mock_execution_notification,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
actions = []
|
||||
first_nop = self.create_action(
|
||||
"nop", {'message': 'test'},
|
||||
@@ -208,9 +221,13 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_two_actions(self, m_send_update, m_execution):
|
||||
def test_execute_with_two_actions(self, m_send_update, m_execution,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
actions = []
|
||||
second = self.create_action("sleep", {'duration': 0.0})
|
||||
first = self.create_action("nop", {'message': 'test'})
|
||||
@@ -225,11 +242,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_three_actions(self, m_send_update, m_execution):
|
||||
def test_execute_with_three_actions(self, m_send_update, m_execution,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
actions = []
|
||||
|
||||
third = self.create_action("nop", {'message': 'next'})
|
||||
second = self.create_action("sleep", {'duration': 0.0})
|
||||
first = self.create_action("nop", {'message': 'hello'})
|
||||
@@ -249,9 +269,13 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_exception(self, m_send_update, m_execution):
|
||||
def test_execute_with_exception(self, m_send_update, m_execution,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
actions = []
|
||||
|
||||
third = self.create_action("no_exist", {'message': 'next'})
|
||||
@@ -273,11 +297,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
self.check_action_state(second, objects.action.State.SUCCEEDED)
|
||||
self.check_action_state(third, objects.action.State.FAILED)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
@mock.patch.object(factory.ActionFactory, "make_action")
|
||||
def test_execute_with_action_exception(self, m_make_action, m_send_update,
|
||||
m_send_execution):
|
||||
m_send_execution, m_get_actionplan):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
actions = [self.create_action("fake_action", {})]
|
||||
m_make_action.return_value = FakeAction(mock.Mock())
|
||||
|
||||
@@ -286,3 +313,43 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
|
||||
self.assertIsInstance(exc.kwargs['error'], ExpectedException)
|
||||
self.check_action_state(actions[0], objects.action.State.FAILED)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||
def test_execute_with_action_plan_cancel(self, m_get_actionplan):
|
||||
obj_utils.create_test_goal(self.context)
|
||||
strategy = obj_utils.create_test_strategy(self.context)
|
||||
audit = obj_utils.create_test_audit(
|
||||
self.context, strategy_id=strategy.id)
|
||||
action_plan = obj_utils.create_test_action_plan(
|
||||
self.context, audit_id=audit.id,
|
||||
strategy_id=strategy.id,
|
||||
state=objects.action_plan.State.CANCELLING)
|
||||
action1 = obj_utils.create_test_action(
|
||||
self.context, action_plan_id=action_plan.id,
|
||||
action_type='nop', state=objects.action.State.SUCCEEDED,
|
||||
input_parameters={'message': 'hello World'})
|
||||
action2 = obj_utils.create_test_action(
|
||||
self.context, action_plan_id=action_plan.id,
|
||||
action_type='nop', state=objects.action.State.ONGOING,
|
||||
uuid='9eb51e14-936d-4d12-a500-6ba0f5e0bb1c',
|
||||
input_parameters={'message': 'hello World'})
|
||||
action3 = obj_utils.create_test_action(
|
||||
self.context, action_plan_id=action_plan.id,
|
||||
action_type='nop', state=objects.action.State.PENDING,
|
||||
uuid='bc7eee5c-4fbe-4def-9744-b539be55aa19',
|
||||
input_parameters={'message': 'hello World'})
|
||||
m_get_actionplan.return_value = action_plan
|
||||
actions = []
|
||||
actions.append(action1)
|
||||
actions.append(action2)
|
||||
actions.append(action3)
|
||||
|
||||
self.assertRaises(exception.ActionPlanCancelled,
|
||||
self.engine.execute, actions)
|
||||
try:
|
||||
self.check_action_state(action1, objects.action.State.SUCCEEDED)
|
||||
self.check_action_state(action2, objects.action.State.CANCELLED)
|
||||
self.check_action_state(action3, objects.action.State.CANCELLED)
|
||||
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from watcher.applier.workflow_engine import default as tflow
|
||||
from watcher import objects
|
||||
from watcher.tests.db import base
|
||||
from watcher.tests.objects import utils as obj_utils
|
||||
|
||||
|
||||
class TestTaskFlowActionContainer(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestTaskFlowActionContainer, self).setUp()
|
||||
self.engine = tflow.DefaultWorkFlowEngine(
|
||||
config=mock.Mock(),
|
||||
context=self.context,
|
||||
applier_manager=mock.MagicMock())
|
||||
obj_utils.create_test_goal(self.context)
|
||||
self.strategy = obj_utils.create_test_strategy(self.context)
|
||||
self.audit = obj_utils.create_test_audit(
|
||||
self.context, strategy_id=self.strategy.id)
|
||||
|
||||
def test_execute(self):
|
||||
action_plan = obj_utils.create_test_action_plan(
|
||||
self.context, audit_id=self.audit.id,
|
||||
strategy_id=self.strategy.id,
|
||||
state=objects.action.State.ONGOING)
|
||||
|
||||
action = obj_utils.create_test_action(
|
||||
self.context, action_plan_id=action_plan.id,
|
||||
state=objects.action.State.ONGOING,
|
||||
action_type='nop',
|
||||
input_parameters={'message': 'hello World'})
|
||||
action_container = tflow.TaskFlowActionContainer(
|
||||
db_action=action,
|
||||
engine=self.engine)
|
||||
action_container.execute()
|
||||
|
||||
self.assertTrue(action.state, objects.action.State.SUCCEEDED)
|
||||
|
||||
@mock.patch('eventlet.spawn')
|
||||
def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn):
|
||||
action_plan = obj_utils.create_test_action_plan(
|
||||
self.context, audit_id=self.audit.id,
|
||||
strategy_id=self.strategy.id,
|
||||
state=objects.action_plan.State.CANCELLING)
|
||||
|
||||
action = obj_utils.create_test_action(
|
||||
self.context, action_plan_id=action_plan.id,
|
||||
state=objects.action.State.ONGOING,
|
||||
action_type='nop',
|
||||
input_parameters={'message': 'hello World'})
|
||||
action_container = tflow.TaskFlowActionContainer(
|
||||
db_action=action,
|
||||
engine=self.engine)
|
||||
|
||||
def empty_test():
|
||||
pass
|
||||
et = eventlet.spawn(empty_test)
|
||||
mock_eventlet_spawn.return_value = et
|
||||
action_container.execute()
|
||||
et.kill.assert_called_with()
|
||||
Reference in New Issue
Block a user