Remove stale notification code

In this changeset, I cleaned up the Watcher codebase to remove
the old notification mechanism that is actually unused.

Partially Implements: blueprint watcher-notifications-ovo

Change-Id: I1901e65f031441b98a7d6f6c9c1c0364eaaaf481
This commit is contained in:
Vincent Françoise
2016-11-08 17:17:57 +01:00
parent cdee2719f7
commit 395ccbd94c
28 changed files with 45 additions and 760 deletions

View File

@@ -20,8 +20,6 @@ from oslo_log import log
from watcher.applier.action_plan import base
from watcher.applier import default
from watcher.applier.messaging import event_types
from watcher.common.messaging.events import event
from watcher import objects
LOG = log.getLogger(__name__)
@@ -34,32 +32,20 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
self.service = service
self.action_plan_uuid = action_plan_uuid
def notify(self, uuid, event_type, state):
def update_action_plan(self, uuid, state):
action_plan = objects.ActionPlan.get_by_uuid(self.ctx, uuid)
action_plan.state = state
action_plan.save()
ev = event.Event()
ev.type = event_type
ev.data = {}
payload = {'action_plan__uuid': uuid,
'action_plan_state': state}
self.service.publish_status_event(ev.type.name, payload)
def execute(self):
try:
# update state
self.notify(self.action_plan_uuid,
event_types.EventTypes.LAUNCH_ACTION_PLAN,
objects.action_plan.State.ONGOING)
self.update_action_plan(self.action_plan_uuid,
objects.action_plan.State.ONGOING)
applier = default.DefaultApplier(self.ctx, self.service)
applier.execute(self.action_plan_uuid)
state = objects.action_plan.State.SUCCEEDED
except Exception as e:
LOG.exception(e)
state = objects.action_plan.State.FAILED
finally:
# update state
self.notify(self.action_plan_uuid,
event_types.EventTypes.LAUNCH_ACTION_PLAN,
state)
self.update_action_plan(self.action_plan_uuid, state)

View File

@@ -1,25 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import enum
class EventTypes(enum.Enum):
LAUNCH_ACTION_PLAN = "launch_action_plan"
LAUNCH_ACTION = "launch_action"

View File

@@ -21,10 +21,8 @@ import abc
import six
from watcher.applier.actions import factory
from watcher.applier.messaging import event_types
from watcher.common import clients
from watcher.common.loader import loadable
from watcher.common.messaging.events import event
from watcher import objects
@@ -77,12 +75,7 @@ class BaseWorkFlowEngine(loadable.Loadable):
db_action = objects.Action.get_by_uuid(self.context, action.uuid)
db_action.state = state
db_action.save()
ev = event.Event()
ev.type = event_types.EventTypes.LAUNCH_ACTION
ev.data = {}
payload = {'action_uuid': action.uuid,
'action_state': state}
self.applier_manager.publish_status_event(ev.type.name, payload)
# NOTE(v-francoise): Implement notifications for action
@abc.abstractmethod
def execute(self, actions):

View File

@@ -1,54 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class Event(object):
"""Generic event to use with EventDispatcher"""
def __init__(self, event_type=None, data=None, request_id=None):
"""Default constructor
:param event_type: the type of the event
:param data: a dictionary which contains data
:param request_id: a string which represent the uuid of the request
"""
self._type = event_type
self._data = data
self._request_id = request_id
@property
def type(self):
return self._type
@type.setter
def type(self, type):
self._type = type
@property
def data(self):
return self._data
@data.setter
def data(self, data):
self._data = data
@property
def request_id(self):
return self._request_id
@request_id.setter
def request_id(self, id):
self._request_id = id

View File

@@ -1,78 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log
from watcher.decision_engine.messaging import events as messaging_events
LOG = log.getLogger(__name__)
class EventDispatcher(object):
"""Generic event dispatcher which listen and dispatch events"""
def __init__(self):
self._events = dict()
def __del__(self):
self._events = None
def has_listener(self, event_type, listener):
"""Return true if listener is register to event_type """
# Check for event type and for the listener
if event_type in self._events.keys():
return listener in self._events[event_type]
else:
return False
def dispatch_event(self, event):
LOG.debug("dispatch evt : %s" % str(event.type))
"""
Dispatch an instance of Event class
"""
if messaging_events.Events.ALL in self._events.keys():
listeners = self._events[messaging_events.Events.ALL]
for listener in listeners:
listener(event)
# Dispatch the event to all the associated listeners
if event.type in self._events.keys():
listeners = self._events[event.type]
for listener in listeners:
listener(event)
def add_event_listener(self, event_type, listener):
"""Add an event listener for an event type"""
# Add listener to the event type
if not self.has_listener(event_type, listener):
listeners = self._events.get(event_type, [])
listeners.append(listener)
self._events[event_type] = listeners
def remove_event_listener(self, event_type, listener):
"""Remove event listener. """
# Remove the listener from the event type
if self.has_listener(event_type, listener):
listeners = self._events[event_type]
if len(listeners) == 1:
# Only this listener remains so remove the key
del self._events[event_type]
else:
# Update listeners chain
listeners.remove(listener)
self._events[event_type] = listeners

View File

@@ -1,120 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import threading
import eventlet
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as om
from watcher.common import rpc
from watcher._i18n import _LE, _LW
# NOTE:
# Ubuntu 14.04 forces librabbitmq when kombu is used
# Unfortunately it forces a version that has a crash
# bug. Calling eventlet.monkey_patch() tells kombu
# to use libamqp instead.
eventlet.monkey_patch()
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class MessagingHandler(threading.Thread):
def __init__(self, publisher_id, topic_name, endpoints, version,
serializer=None):
super(MessagingHandler, self).__init__()
self.publisher_id = publisher_id
self.topic_name = topic_name
self.__endpoints = []
self.__serializer = serializer
self.__version = version
self.__server = None
self.__notifier = None
self.__transport = None
self.add_endpoints(endpoints)
def add_endpoints(self, endpoints):
self.__endpoints.extend(endpoints)
def remove_endpoint(self, endpoint):
if endpoint in self.__endpoints:
self.__endpoints.remove(endpoint)
@property
def endpoints(self):
return self.__endpoints
@property
def transport(self):
return self.__transport
def build_notifier(self):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
return om.Notifier(
self.__transport,
publisher_id=self.publisher_id,
topic=self.topic_name,
serializer=serializer
)
def build_server(self, target):
return om.get_rpc_server(self.__transport, target,
self.__endpoints,
executor='eventlet',
serializer=self.__serializer)
def _configure(self):
try:
self.__transport = om.get_transport(CONF)
self.__notifier = self.build_notifier()
if len(self.__endpoints):
target = om.Target(
topic=self.topic_name,
# For compatibility, we can override it with 'host' opt
server=CONF.host or socket.getfqdn(),
version=self.__version,
)
self.__server = self.build_server(target)
else:
LOG.warning(
_LW("No endpoint defined; can only publish events"))
except Exception as e:
LOG.exception(e)
LOG.error(_LE("Messaging configuration error"))
def run(self):
LOG.debug("configure MessagingHandler for %s" % self.topic_name)
self._configure()
if len(self.__endpoints) > 0:
LOG.debug("Starting up server")
self.__server.start()
def stop(self):
LOG.debug('Stopped server')
self.__server.stop()
def publish_event(self, event_type, payload, request_id=None):
self.__notifier.info(
{'version_api': self.__version,
'request_id': request_id},
{'event_id': event_type}, payload
)

View File

@@ -1,47 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import oslo_messaging as messaging
from watcher.common.messaging.utils import observable
eventlet.monkey_patch()
class NotificationHandler(observable.Observable):
def __init__(self, publisher_id):
super(NotificationHandler, self).__init__()
self.publisher_id = publisher_id
def info(self, ctx, publisher_id, event_type, payload, metadata):
if publisher_id == self.publisher_id:
self.set_changed()
self.notify(ctx, publisher_id, event_type, metadata, payload)
return messaging.NotificationResult.HANDLED
def warn(self, ctx, publisher_id, event_type, payload, metadata):
if publisher_id == self.publisher_id:
self.set_changed()
self.notify(ctx, publisher_id, event_type, metadata, payload)
return messaging.NotificationResult.HANDLED
def error(self, ctx, publisher_id, event_type, payload, metadata):
if publisher_id == self.publisher_id:
self.set_changed()
self.notify(ctx, publisher_id, event_type, metadata, payload)
return messaging.NotificationResult.HANDLED

View File

@@ -17,6 +17,7 @@
import datetime
import socket
import eventlet
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import _options
@@ -27,12 +28,10 @@ from oslo_reports import opts as gmr_opts
from oslo_service import service
from oslo_service import wsgi
from watcher._i18n import _, _LI
from watcher._i18n import _
from watcher.api import app
from watcher.common import config
from watcher.common import context
from watcher.common.messaging.events import event_dispatcher as dispatcher
from watcher.common.messaging import messaging_handler
from watcher.common import rpc
from watcher.common import scheduling
from watcher import objects
@@ -40,6 +39,13 @@ from watcher.objects import base
from watcher import opts
from watcher import version
# NOTE:
# Ubuntu 14.04 forces librabbitmq when kombu is used
# Unfortunately it forces a version that has a crash
# bug. Calling eventlet.monkey_patch() tells kombu
# to use libamqp instead.
eventlet.monkey_patch()
service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
@@ -153,7 +159,7 @@ class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
"""
class Service(service.ServiceBase, dispatcher.EventDispatcher):
class Service(service.ServiceBase):
API_VERSION = '1.0'
@@ -248,9 +254,16 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher):
self.status_client = c
def build_topic_handler(self, topic_name, endpoints=()):
return messaging_handler.MessagingHandler(
self.publisher_id, topic_name, [self.manager] + list(endpoints),
self.api_version, self.serializer)
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
target = om.Target(
topic=topic_name,
# For compatibility, we can override it with 'host' opt
server=CONF.host or socket.getfqdn(),
version=self.api_version,
)
return om.get_rpc_server(
self.transport, target, endpoints,
executor='eventlet', serializer=serializer)
def build_notification_handler(self, topic_names, endpoints=()):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
@@ -290,34 +303,11 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher):
def wait(self):
"""Wait for service to complete."""
def publish_control(self, event, payload):
return self.conductor_topic_handler.publish_event(event, payload)
def publish_status_event(self, event, payload, request_id=None):
if self.status_topic_handler:
return self.status_topic_handler.publish_event(
event, payload, request_id)
else:
LOG.info(
_LI("No status notifier declared: notification '%s' not sent"),
event)
def get_version(self):
return self.api_version
def check_api_version(self, context):
def check_api_version(self, ctx):
api_manager_version = self.conductor_client.call(
context, 'check_api_version',
api_version=self.api_version)
ctx, 'check_api_version', api_version=self.api_version)
return api_manager_version
def response(self, evt, ctx, message):
payload = {
'request_id': ctx['request_id'],
'msg': message
}
self.publish_status_event(evt, payload)
def launch(conf, service_, workers=1, restart_method='reload'):
return service.launch(conf, service_, workers, restart_method)

View File

@@ -22,8 +22,6 @@ import six
from oslo_log import log
from watcher.common.messaging.events import event as watcher_event
from watcher.decision_engine.messaging import events as de_events
from watcher.decision_engine.planner import manager as planner_manager
from watcher.decision_engine.strategy.context import default as default_context
from watcher import objects
@@ -72,19 +70,11 @@ class AuditHandler(BaseAuditHandler):
def strategy_context(self):
return self._strategy_context
def notify(self, audit_uuid, event_type, status):
event = watcher_event.Event()
event.type = event_type
event.data = {}
payload = {'audit_uuid': audit_uuid,
'audit_status': status}
self.messaging.publish_status_event(event.type.name, payload)
def update_audit_state(self, audit, state):
@staticmethod
def update_audit_state(audit, state):
LOG.debug("Update audit state: %s", state)
audit.state = state
audit.save()
self.notify(audit.uuid, de_events.Events.TRIGGER_AUDIT, state)
def pre_execute(self, audit, request_context):
LOG.debug("Trigger audit %s", audit.uuid)

View File

@@ -1,26 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import enum
class Events(enum.Enum):
ALL = '*',
ACTION_PLAN = "action_plan"
TRIGGER_AUDIT = "trigger_audit"

View File

@@ -14,8 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from oslo_concurrency import lockutils
from oslo_log import log
from watcher._i18n import _LW
@@ -28,7 +27,6 @@ class Mapping(object):
self.model = model
self.compute_node_mapping = {}
self.instance_mapping = {}
self.lock = threading.Lock()
def map(self, node, instance):
"""Select the node where the instance is launched
@@ -36,9 +34,7 @@ class Mapping(object):
:param node: the node
:param instance: the virtual machine or instance
"""
try:
self.lock.acquire()
with lockutils.lock(__name__):
# init first
if node.uuid not in self.compute_node_mapping.keys():
self.compute_node_mapping[node.uuid] = set()
@@ -49,9 +45,6 @@ class Mapping(object):
# map instance => node
self.instance_mapping[instance.uuid] = node.uuid
finally:
self.lock.release()
def unmap(self, node, instance):
"""Remove the instance from the node
@@ -65,8 +58,7 @@ class Mapping(object):
:rtype : object
"""
try:
self.lock.acquire()
with lockutils.lock(__name__):
if str(node_uuid) in self.compute_node_mapping:
self.compute_node_mapping[str(node_uuid)].remove(
str(instance_uuid))
@@ -77,8 +69,6 @@ class Mapping(object):
_LW("Trying to delete the instance %(instance)s but it "
"was not found on node %(node)s") %
{'instance': instance_uuid, 'node': node_uuid})
finally:
self.lock.release()
def get_mapping(self):
return self.compute_node_mapping

View File

@@ -20,7 +20,6 @@
from oslo_config import cfg
from watcher.common import exception
from watcher.common.messaging import notification_handler
from watcher.common import service
from watcher.common import utils
from watcher.decision_engine import manager
@@ -78,7 +77,7 @@ class DecisionEngineAPIManager(object):
@property
def status_endpoints(self):
return [notification_handler.NotificationHandler]
return []
@property
def notification_endpoints(self):

View File

@@ -19,7 +19,6 @@
import mock
from watcher.applier.action_plan import default
from watcher.applier.messaging import event_types as ev
from watcher.objects import action_plan as ap_objects
from watcher.tests.db import base
from watcher.tests.objects import utils as obj_utils
@@ -41,20 +40,3 @@ class TestDefaultActionPlanHandler(base.DbTestCase):
action_plan = ap_objects.ActionPlan.get_by_uuid(
self.context, self.action_plan.uuid)
self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state)
def test_trigger_audit_send_notification(self):
messaging = mock.MagicMock()
command = default.DefaultActionPlanHandler(
self.context, messaging, self.action_plan.uuid)
command.execute()
call_on_going = mock.call(ev.EventTypes.LAUNCH_ACTION_PLAN.name, {
'action_plan_state': ap_objects.State.ONGOING,
'action_plan__uuid': self.action_plan.uuid})
call_succeeded = mock.call(ev.EventTypes.LAUNCH_ACTION_PLAN.name, {
'action_plan_state': ap_objects.State.SUCCEEDED,
'action_plan__uuid': self.action_plan.uuid})
calls = [call_on_going, call_succeeded]
messaging.publish_status_event.assert_has_calls(calls)
self.assertEqual(2, messaging.publish_status_event.call_count)

View File

@@ -19,8 +19,8 @@
import mock
import oslo_messaging as om
from watcher.applier import manager as applier_manager
from watcher.common.messaging import messaging_handler
from watcher.common import service
from watcher.tests import base
@@ -30,8 +30,8 @@ class TestApplierManager(base.TestCase):
super(TestApplierManager, self).setUp()
self.applier = service.Service(applier_manager.ApplierManager)
@mock.patch.object(messaging_handler.MessagingHandler, "stop")
@mock.patch.object(messaging_handler.MessagingHandler, "start")
@mock.patch.object(om.rpc.server.RPCServer, "stop")
@mock.patch.object(om.rpc.server.RPCServer, "start")
def test_start(self, m_messaging_start, m_messaging_stop):
self.applier.start()
self.applier.stop()

View File

@@ -27,14 +27,11 @@ from watcher.tests import base
class TestApplierAPI(base.TestCase):
def setUp(self):
super(TestApplierAPI, self).setUp()
api = rpcapi.ApplierAPI()
def test_get_version(self):
expected_version = self.api.API_VERSION
self.assertEqual(expected_version, self.api.get_version())
def setUp(self):
super(TestApplierAPI, self).setUp()
def test_get_api_version(self):
with mock.patch.object(om.RPCClient, 'call') as mock_call:

View File

@@ -1 +0,0 @@
__author__ = 'bcom'

View File

@@ -1,84 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from mock import call
from watcher.common.messaging.events import event as messaging_event
from watcher.common.messaging.events import event_dispatcher
from watcher.decision_engine.messaging import events as messaging_events
from watcher.tests import base
class TestEventDispatcher(base.TestCase):
def setUp(self):
super(TestEventDispatcher, self).setUp()
self.event_dispatcher = event_dispatcher.EventDispatcher()
def fake_listener(self):
return mock.MagicMock()
def fake_event(self, event_type):
event = messaging_event.Event()
event.type = event_type
return event
def test_add_listener(self):
listener = self.fake_listener()
self.event_dispatcher.add_event_listener(messaging_events.Events.ALL,
listener)
self.assertTrue(self.event_dispatcher.has_listener(
messaging_events.Events.ALL, listener))
def test_remove_listener(self):
listener = self.fake_listener()
self.event_dispatcher.add_event_listener(messaging_events.Events.ALL,
listener)
self.event_dispatcher.remove_event_listener(
messaging_events.Events.ALL, listener)
self.assertFalse(self.event_dispatcher.has_listener(
messaging_events.Events.TRIGGER_AUDIT, listener))
def test_dispatch_event(self):
listener = self.fake_listener()
event = self.fake_event(messaging_events.Events.TRIGGER_AUDIT)
self.event_dispatcher.add_event_listener(
messaging_events.Events.TRIGGER_AUDIT, listener)
self.event_dispatcher.dispatch_event(event)
listener.assert_has_calls(calls=[call(event)])
def test_dispatch_event_to_all_listener(self):
event = self.fake_event(messaging_events.Events.ACTION_PLAN)
listener_all = self.fake_listener()
listener_action_plan = self.fake_listener()
listener_trigger_audit = self.fake_listener()
self.event_dispatcher.add_event_listener(
messaging_events.Events.ALL, listener_all)
self.event_dispatcher.add_event_listener(
messaging_events.Events.ACTION_PLAN, listener_action_plan)
self.event_dispatcher.add_event_listener(
messaging_events.Events.TRIGGER_AUDIT, listener_trigger_audit)
self.event_dispatcher.dispatch_event(event)
listener_all.assert_has_calls(calls=[call(event)])
listener_action_plan.assert_has_calls(calls=[call(event)])
listener_trigger_audit.assert_has_calls([])

View File

@@ -1,78 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
import oslo_messaging as messaging
from watcher.common.messaging import messaging_handler
from watcher.tests import base
CONF = cfg.CONF
class TestMessagingHandler(base.TestCase):
PUBLISHER_ID = 'TEST_API'
TOPIC_WATCHER = 'TEST_TOPIC_WATCHER'
ENDPOINT = 'http://fake-fqdn:1337'
VERSION = "1.0"
def setUp(self):
super(TestMessagingHandler, self).setUp()
CONF.set_default('host', 'fake-fqdn')
@mock.patch.object(messaging, "get_rpc_server")
@mock.patch.object(messaging, "Target")
def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server):
m_target = mock.Mock()
m_target_cls.return_value = m_target
handler = messaging_handler.MessagingHandler(
publisher_id=self.PUBLISHER_ID,
topic_name=self.TOPIC_WATCHER,
endpoints=[self.ENDPOINT],
version=self.VERSION,
serializer=None,
)
handler.run()
m_target_cls.assert_called_once_with(
server="fake-fqdn",
topic="TEST_TOPIC_WATCHER",
version="1.0",
)
m_get_rpc_server.assert_called_once_with(
handler.transport,
m_target,
[self.ENDPOINT],
executor='eventlet',
serializer=None,
)
def test_messaging_handler_remove_endpoint(self):
handler = messaging_handler.MessagingHandler(
publisher_id=self.PUBLISHER_ID,
topic_name=self.TOPIC_WATCHER,
endpoints=[self.ENDPOINT],
version=self.VERSION,
serializer=None,
)
self.assertEqual([self.ENDPOINT], handler.endpoints)
handler.remove_endpoint(self.ENDPOINT)
self.assertEqual([], handler.endpoints)

View File

@@ -1,56 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import oslo_messaging as messaging
from watcher.common.messaging import notification_handler
from watcher.common.messaging.utils import observable
from watcher.tests import base
PUBLISHER_ID = 'TEST_API'
class TestNotificationHandler(base.TestCase):
def setUp(self):
super(TestNotificationHandler, self).setUp()
self.notification_handler = notification_handler.NotificationHandler(
PUBLISHER_ID)
def _test_notify(self, level_to_call):
ctx = {}
publisher_id = PUBLISHER_ID
event_type = 'Test'
payload = {}
metadata = {}
with mock.patch.object(observable.Observable, 'notify') as mock_call:
notification_result = level_to_call(ctx, publisher_id, event_type,
payload, metadata)
self.assertEqual(messaging.NotificationResult.HANDLED,
notification_result)
mock_call.assert_called_once_with(ctx, publisher_id, event_type,
metadata, payload)
def test_notify_info(self):
self._test_notify(self.notification_handler.info)
def test_notify_warn(self):
self._test_notify(self.notification_handler.warn)
def test_notify_error(self):
self._test_notify(self.notification_handler.error)

View File

@@ -19,7 +19,7 @@ import mock
from oslo_config import cfg
from watcher.common.messaging import messaging_handler
import oslo_messaging as om
from watcher.common import rpc
from watcher.common import service
from watcher import objects
@@ -81,13 +81,13 @@ class TestService(base.TestCase):
def setUp(self):
super(TestService, self).setUp()
@mock.patch.object(messaging_handler, "MessagingHandler")
@mock.patch.object(om.rpc.server, "RPCServer")
def test_start(self, m_handler):
dummy_service = service.Service(DummyManager)
dummy_service.start()
self.assertEqual(2, m_handler.call_count)
@mock.patch.object(messaging_handler, "MessagingHandler")
@mock.patch.object(om.rpc.server, "RPCServer")
def test_stop(self, m_handler):
dummy_service = service.Service(DummyManager)
dummy_service.stop()
@@ -98,6 +98,8 @@ class TestService(base.TestCase):
dummy_service = service.Service(DummyManager)
handler = dummy_service.build_topic_handler(topic_name)
self.assertIsNotNone(handler)
self.assertIsInstance(handler, om.rpc.server.RPCServer)
self.assertEqual("mytopic", handler._target.topic)
def test_init_service(self):
dummy_service = service.Service(DummyManager)
@@ -105,58 +107,7 @@ class TestService(base.TestCase):
rpc.RequestContextSerializer)
self.assertIsInstance(
dummy_service.conductor_topic_handler,
messaging_handler.MessagingHandler)
om.rpc.server.RPCServer)
self.assertIsInstance(
dummy_service.status_topic_handler,
messaging_handler.MessagingHandler)
@mock.patch.object(messaging_handler, "MessagingHandler")
def test_publish_control(self, m_handler_cls):
m_handler = mock.Mock()
m_handler_cls.return_value = m_handler
payload = {
"name": "value",
}
event = "myevent"
dummy_service = service.Service(DummyManager)
dummy_service.publish_control(event, payload)
m_handler.publish_event.assert_called_once_with(event, payload)
@mock.patch.object(messaging_handler, "MessagingHandler")
def test_publish_status_event(self, m_handler_cls):
m_handler = mock.Mock()
m_handler_cls.return_value = m_handler
payload = {
"name": "value",
}
event = "myevent"
dummy_service = service.Service(DummyManager)
dummy_service.publish_status_event(event, payload)
m_handler.publish_event.assert_called_once_with(event, payload, None)
@mock.patch.object(service.Service, 'publish_status_event')
def test_response(self, mock_call):
event = "My event"
context = {'request_id': 12}
message = "My Message"
dummy_service = service.Service(DummyManager)
dummy_service.response(event, context, message)
expected_payload = {
'request_id': context['request_id'],
'msg': message
}
mock_call.assert_called_once_with(event, expected_payload)
def test_messaging_build_topic_handler(self):
dummy_service = service.Service(DummyManager)
topic = dummy_service.build_topic_handler("conductor_topic")
self.assertIsInstance(topic, messaging_handler.MessagingHandler)
self.assertEqual("pub_id", dummy_service.publisher_id)
self.assertEqual("pub_id", topic.publisher_id)
self.assertEqual("conductor_topic",
dummy_service.conductor_topic_handler.topic_name)
self.assertEqual("conductor_topic", topic.topic_name)
om.rpc.server.RPCServer)

View File

@@ -21,7 +21,6 @@ import mock
from watcher.decision_engine.audit import continuous
from watcher.decision_engine.audit import oneshot
from watcher.decision_engine.messaging import events
from watcher.decision_engine.model.collector import manager
from watcher.objects import audit as audit_objects
from watcher.tests.db import base
@@ -57,25 +56,6 @@ class TestOneShotAuditHandler(base.DbTestCase):
audit = audit_objects.Audit.get_by_uuid(self.context, self.audit.uuid)
self.assertEqual(audit_objects.State.SUCCEEDED, audit.state)
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
def test_trigger_audit_send_notification(self, mock_collector):
messaging = mock.MagicMock()
mock_collector.return_value = faker.FakerModelCollector()
audit_handler = oneshot.OneShotAuditHandler(messaging)
audit_handler.execute(self.audit, self.context)
call_on_going = mock.call(events.Events.TRIGGER_AUDIT.name, {
'audit_status': audit_objects.State.ONGOING,
'audit_uuid': self.audit.uuid})
call_succeeded = mock.call(events.Events.TRIGGER_AUDIT.name, {
'audit_status': audit_objects.State.SUCCEEDED,
'audit_uuid': self.audit.uuid})
calls = [call_on_going, call_succeeded]
messaging.publish_status_event.assert_has_calls(calls)
self.assertEqual(
2, messaging.publish_status_event.call_count)
class TestContinuousAuditHandler(base.DbTestCase):
def setUp(self):

View File

@@ -29,10 +29,6 @@ class TestDecisionEngineAPI(base.TestCase):
api = rpcapi.DecisionEngineAPI()
def test_get_version(self):
expected_version = self.api.API_VERSION
self.assertEqual(expected_version, self.api.get_version())
def test_get_api_version(self):
with mock.patch.object(om.RPCClient, 'call') as mock_call:
expected_context = self.context