Add Action Notification

This patch set adds the following action notifications:

- action.create
- action.update
- action.delete
- action.execution.start
- action.execution.end
- action.execution.error

Partially Implements: blueprint action-versioned-notifications-api

Change-Id: If0bc25bfb7cb1bff3bfa2c5d5fb9ad48b0794168
This commit is contained in:
Alexander Chadin
2017-02-01 14:21:18 +03:00
committed by alexchadin
parent 62cb8a8d29
commit 25789c9c5a
22 changed files with 1188 additions and 120 deletions

View File

@@ -18,12 +18,20 @@
import abc
from oslo_log import log
import six
from taskflow import task as flow_task
from watcher._i18n import _LE
from watcher.applier.actions import factory
from watcher.common import clients
from watcher.common.loader import loadable
from watcher import notifications
from watcher import objects
from watcher.objects import fields
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
@@ -72,11 +80,95 @@ class BaseWorkFlowEngine(loadable.Loadable):
return self._action_factory
def notify(self, action, state):
db_action = objects.Action.get_by_uuid(self.context, action.uuid)
db_action = objects.Action.get_by_uuid(self.context, action.uuid,
eager=True)
db_action.state = state
db_action.save()
# NOTE(v-francoise): Implement notifications for action
@abc.abstractmethod
def execute(self, actions):
raise NotImplementedError()
class BaseTaskFlowActionContainer(flow_task.Task):
def __init__(self, name, db_action, engine, **kwargs):
super(BaseTaskFlowActionContainer, self).__init__(name=name)
self._db_action = db_action
self._engine = engine
self.loaded_action = None
@property
def engine(self):
return self._engine
@property
def action(self):
if self.loaded_action is None:
action = self.engine.action_factory.make_action(
self._db_action,
osc=self._engine.osc)
self.loaded_action = action
return self.loaded_action
@abc.abstractmethod
def do_pre_execute(self):
raise NotImplementedError()
@abc.abstractmethod
def do_execute(self, *args, **kwargs):
raise NotImplementedError()
@abc.abstractmethod
def do_post_execute(self):
raise NotImplementedError()
# NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute,
# post_execute) independently. We want to support notifications in base
# class, so child's methods should be named with `do_` prefix and wrapped.
def pre_execute(self):
try:
self.do_pre_execute()
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.START)
except Exception as e:
LOG.exception(e)
self.engine.notify(self._db_action, objects.action.State.FAILED)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
def execute(self, *args, **kwargs):
try:
self.do_execute(*args, **kwargs)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.END)
except Exception as e:
LOG.exception(e)
LOG.error(_LE('The workflow engine has failed '
'to execute the action: %s'), self.name)
self.engine.notify(self._db_action, objects.action.State.FAILED)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
raise
def post_execute(self):
try:
self.do_post_execute()
except Exception as e:
LOG.exception(e)
self.engine.notify(self._db_action, objects.action.State.FAILED)
notifications.action.send_execution_notification(
self.engine.context, self._db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)