Cancel Action Plan

This patch adds feature to cancel action plan in watcher.
A General flow from watcher-api to watcher-applier is implemented.

action plan cancel can cancel any [ongoing, pending, recommended]
action plan, it will update the action states also to "cancelled".
For ongoing actions in action plan, actions needs to be aborted.
Seperate patches will be added to support abort operation
in each action.

Notification part is addressed by a seperate blueprint.
https://blueprints.launchpad.net/watcher/+spec/notifications-actionplan-cancel

Change-Id: I895a5eaca5239d5657702c8d1875b9ece21682dc
Partially-Implements: blueprint cancel-action-plan
This commit is contained in:
aditi
2017-05-26 11:40:36 +00:00
parent 58d86de064
commit d7a44739a6
16 changed files with 362 additions and 28 deletions

View File

@@ -17,13 +17,17 @@
#
import abc
import six
import time
import eventlet
from oslo_log import log
import six
from taskflow import task as flow_task
from watcher.applier.actions import factory
from watcher.common import clients
from watcher.common import exception
from watcher.common.loader import loadable
from watcher import notifications
from watcher import objects
@@ -32,6 +36,9 @@ from watcher.objects import fields
LOG = log.getLogger(__name__)
CANCEL_STATE = [objects.action_plan.State.CANCELLING,
objects.action_plan.State.CANCELLED]
@six.add_metaclass(abc.ABCMeta)
class BaseWorkFlowEngine(loadable.Loadable):
@@ -81,6 +88,10 @@ class BaseWorkFlowEngine(loadable.Loadable):
def notify(self, action, state):
db_action = objects.Action.get_by_uuid(self.context, action.uuid,
eager=True)
if (db_action.state in [objects.action.State.CANCELLING,
objects.action.State.CANCELLED] and
state == objects.action.State.SUCCEEDED):
return
db_action.state = state
db_action.save()
@@ -122,16 +133,34 @@ class BaseTaskFlowActionContainer(flow_task.Task):
def do_post_execute(self):
raise NotImplementedError()
@abc.abstractmethod
def do_revert(self):
raise NotImplementedError()
@abc.abstractmethod
def do_abort(self, *args, **kwargs):
raise NotImplementedError()
# NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute,
# post_execute) independently. We want to support notifications in base
# class, so child's methods should be named with `do_` prefix and wrapped.
def pre_execute(self):
try:
# NOTE(adisky): check the state of action plan before starting
# next action, if action plan is cancelled raise the exceptions
# so that taskflow does not schedule further actions.
action_plan = objects.ActionPlan.get_by_id(
self.engine.context, self._db_action.action_plan_id)
if action_plan.state in CANCEL_STATE:
raise exception.ActionPlanCancelled(uuid=action_plan.uuid)
self.do_pre_execute()
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.START)
except exception.ActionPlanCancelled as e:
LOG.exception(e)
raise
except Exception as e:
LOG.exception(e)
self.engine.notify(self._db_action, objects.action.State.FAILED)
@@ -142,22 +171,59 @@ class BaseTaskFlowActionContainer(flow_task.Task):
priority=fields.NotificationPriority.ERROR)
def execute(self, *args, **kwargs):
def _do_execute_action(*args, **kwargs):
try:
self.do_execute(*args, **kwargs)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.END)
except Exception as e:
LOG.exception(e)
LOG.error('The workflow engine has failed'
'to execute the action: %s', self.name)
self.engine.notify(self._db_action,
objects.action.State.FAILED)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
raise
# NOTE: spawn a new thread for action execution, so that if action plan
# is cancelled workflow engine will not wait to finish action execution
et = eventlet.spawn(_do_execute_action, *args, **kwargs)
# NOTE: check for the state of action plan periodically,so that if
# action is finished or action plan is cancelled we can exit from here.
while True:
action_object = objects.Action.get_by_uuid(
self.engine.context, self._db_action.uuid, eager=True)
action_plan_object = objects.ActionPlan.get_by_id(
self.engine.context, action_object.action_plan_id)
if (action_object.state in [objects.action.State.SUCCEEDED,
objects.action.State.FAILED] or
action_plan_object.state in CANCEL_STATE):
break
time.sleep(2)
try:
self.do_execute(*args, **kwargs)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.END)
# NOTE: kill the action execution thread, if action plan is
# cancelled for all other cases wait for the result from action
# execution thread.
# Not all actions support abort operations, kill only those action
# which support abort operations
abort = self.action.check_abort()
if (action_plan_object.state in CANCEL_STATE and abort):
et.kill()
et.wait()
# NOTE: catch the greenlet exit exception due to thread kill,
# taskflow will call revert for the action,
# we will redirect it to abort.
except eventlet.greenlet.GreenletExit:
raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid)
except Exception as e:
LOG.exception(e)
LOG.error('The workflow engine has failed '
'to execute the action: %s', self.name)
self.engine.notify(self._db_action, objects.action.State.FAILED)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
raise
def post_execute(self):
@@ -171,3 +237,24 @@ class BaseTaskFlowActionContainer(flow_task.Task):
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
def revert(self, *args, **kwargs):
action_plan = objects.ActionPlan.get_by_id(
self.engine.context, self._db_action.action_plan_id, eager=True)
# NOTE: check if revert cause by cancel action plan or
# some other exception occured during action plan execution
# if due to some other exception keep the flow intact.
if action_plan.state not in CANCEL_STATE:
self.do_revert()
action_object = objects.Action.get_by_uuid(
self.engine.context, self._db_action.uuid, eager=True)
if action_object.state == objects.action.State.ONGOING:
action_object.state = objects.action.State.CANCELLING
action_object.save()
self.abort()
if action_object.state == objects.action.State.PENDING:
action_object.state = objects.action.State.CANCELLED
action_object.save()
def abort(self, *args, **kwargs):
self.do_abort(*args, **kwargs)