Added support for versioned notifications

In this changeset, I added all the required modification in order
for Watcher to enable the implementation of versioned notifications.

Change-Id: I600ecbc767583824555b016fb9fc7faf69c53b39
Partially-Implements: blueprint watcher-notifications-ovo
This commit is contained in:
Vincent Françoise
2016-10-17 17:17:56 +02:00
parent 9dc3fce3e5
commit b27e5b91b9
17 changed files with 594 additions and 31 deletions

View File

@@ -136,6 +136,7 @@ class WatcherPersistentObject(object):
:param db_object: A DB model of the object
:param eager: Enable the loading of object fields (Default: False)
:return: The object of the class with the database entity added
"""
obj_class = type(obj)
object_fields = obj_class.object_fields

View File

@@ -24,8 +24,10 @@ from oslo_versionedobjects import fields
LOG = log.getLogger(__name__)
BaseEnumField = fields.BaseEnumField
BooleanField = fields.BooleanField
DateTimeField = fields.DateTimeField
Enum = fields.Enum
IntegerField = fields.IntegerField
ListOfStringsField = fields.ListOfStringsField
ObjectField = fields.ObjectField
@@ -88,3 +90,53 @@ class FlexibleListOfDictField(fields.AutoTypedField):
if self.nullable:
return []
super(FlexibleListOfDictField, self)._null(obj, attr)
# ### Notification fields ### #
class BaseWatcherEnum(Enum):
ALL = ()
def __init__(self, **kwargs):
super(BaseWatcherEnum, self).__init__(valid_values=self.__class__.ALL)
class NotificationPriority(BaseWatcherEnum):
CRITICAL = 'critical'
DEBUG = 'debug'
INFO = 'info'
ERROR = 'error'
SAMPLE = 'sample'
WARNING = 'warn'
ALL = (CRITICAL, DEBUG, INFO, ERROR, SAMPLE, WARNING)
class NotificationPhase(BaseWatcherEnum):
START = 'start'
END = 'end'
ERROR = 'error'
ALL = (START, END, ERROR)
class NotificationAction(BaseWatcherEnum):
CREATE = 'create'
UPDATE = 'update'
EXCEPTION = 'exception'
DELETE = 'delete'
ALL = (CREATE, UPDATE, EXCEPTION, DELETE)
class NotificationPriorityField(BaseEnumField):
AUTO_TYPE = NotificationPriority()
class NotificationPhaseField(BaseEnumField):
AUTO_TYPE = NotificationPhase()
class NotificationActionField(BaseEnumField):
AUTO_TYPE = NotificationAction()

View File

@@ -0,0 +1,166 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcher.common import rpc
from watcher.objects import base
from watcher.objects import fields as wfields
@base.WatcherObjectRegistry.register_if(False)
class NotificationObject(base.WatcherObject):
"""Base class for every notification related versioned object."""
# Version 1.0: Initial version
VERSION = '1.0'
def __init__(self, **kwargs):
super(NotificationObject, self).__init__(**kwargs)
# The notification objects are created on the fly when watcher emits
# the notification. This causes that every object shows every field as
# changed. We don't want to send this meaningless information so we
# reset the object after creation.
self.obj_reset_changes(recursive=False)
@base.WatcherObjectRegistry.register_notification
class EventType(NotificationObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'object': wfields.StringField(),
'action': wfields.NotificationActionField(),
'phase': wfields.NotificationPhaseField(nullable=True),
}
def to_notification_event_type_field(self):
"""Serialize the object to the wire format."""
s = '%s.%s' % (self.object, self.action)
if self.obj_attr_is_set('phase'):
s += '.%s' % self.phase
return s
@base.WatcherObjectRegistry.register_if(False)
class NotificationPayloadBase(NotificationObject):
"""Base class for the payload of versioned notifications."""
# SCHEMA defines how to populate the payload fields. It is a dictionary
# where every key value pair has the following format:
# <payload_field_name>: (<data_source_name>,
# <field_of_the_data_source>)
# The <payload_field_name> is the name where the data will be stored in the
# payload object, this field has to be defined as a field of the payload.
# The <data_source_name> shall refer to name of the parameter passed as
# kwarg to the payload's populate_schema() call and this object will be
# used as the source of the data. The <field_of_the_data_source> shall be
# a valid field of the passed argument.
# The SCHEMA needs to be applied with the populate_schema() call before the
# notification can be emitted.
# The value of the payload.<payload_field_name> field will be set by the
# <data_source_name>.<field_of_the_data_source> field. The
# <data_source_name> will not be part of the payload object internal or
# external representation.
# Payload fields that are not set by the SCHEMA can be filled in the same
# way as in any versioned object.
SCHEMA = {}
# Version 1.0: Initial version
VERSION = '1.0'
def __init__(self, **kwargs):
super(NotificationPayloadBase, self).__init__(**kwargs)
self.populated = not self.SCHEMA
def populate_schema(self, **kwargs):
"""Populate the object based on the SCHEMA and the source objects
:param kwargs: A dict contains the source object at the key defined in
the SCHEMA
"""
for key, (obj, field) in self.SCHEMA.items():
source = kwargs[obj]
if source.obj_attr_is_set(field):
setattr(self, key, getattr(source, field))
self.populated = True
# the schema population will create changed fields but we don't need
# this information in the notification
self.obj_reset_changes(recursive=False)
@base.WatcherObjectRegistry.register_notification
class NotificationPublisher(NotificationObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'host': wfields.StringField(nullable=False),
'binary': wfields.StringField(nullable=False),
}
@base.WatcherObjectRegistry.register_if(False)
class NotificationBase(NotificationObject):
"""Base class for versioned notifications.
Every subclass shall define a 'payload' field.
"""
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'priority': wfields.NotificationPriorityField(),
'event_type': wfields.ObjectField('EventType'),
'publisher': wfields.ObjectField('NotificationPublisher'),
}
def _emit(self, context, event_type, publisher_id, payload):
notifier = rpc.get_notifier(publisher_id)
notify = getattr(notifier, self.priority)
notify(context, event_type=event_type, payload=payload)
def emit(self, context):
"""Send the notification."""
assert self.payload.populated
# Note(gibi): notification payload will be a newly populated object
# therefore every field of it will look changed so this does not carry
# any extra information so we drop this from the payload.
self.payload.obj_reset_changes(recursive=False)
self._emit(
context,
event_type=self.event_type.to_notification_event_type_field(),
publisher_id='%s:%s' % (self.publisher.binary,
self.publisher.host),
payload=self.payload.obj_to_primitive())
def notification_sample(sample):
"""Provide a notification sample of the decatorated notification.
Class decorator to attach the notification sample information
to the notification object for documentation generation purposes.
:param sample: the path of the sample json file relative to the
doc/notification_samples/ directory in the watcher
repository root.
"""
def wrap(cls):
if not getattr(cls, 'samples', None):
cls.samples = [sample]
else:
cls.samples.append(sample)
return cls
return wrap

View File

@@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import six
from watcher.objects import base as base
from watcher.objects import fields as wfields
from watcher.objects.notifications import base as notificationbase
@base.WatcherObjectRegistry.register_notification
class ExceptionPayload(notificationbase.NotificationPayloadBase):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'module_name': wfields.StringField(),
'function_name': wfields.StringField(),
'exception': wfields.StringField(),
'exception_message': wfields.StringField()
}
@classmethod
def from_exception(cls, fault):
trace = inspect.trace()[-1]
# TODO(gibi): apply strutils.mask_password on exception_message and
# consider emitting the exception_message only if the safe flag is
# true in the exception like in the REST API
return cls(
function_name=trace[3],
module_name=inspect.getmodule(trace[0]).__name__,
exception=fault.__class__.__name__,
exception_message=six.text_type(fault))
@base.WatcherObjectRegistry.register_notification
class ExceptionNotification(notificationbase.NotificationBase):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'payload': wfields.ObjectField('ExceptionPayload')
}