diff --git a/watcher/applier/action_plan/default.py b/watcher/applier/action_plan/default.py index c9f4dcf93..0b1d428d8 100644 --- a/watcher/applier/action_plan/default.py +++ b/watcher/applier/action_plan/default.py @@ -20,8 +20,6 @@ from oslo_log import log from watcher.applier.action_plan import base from watcher.applier import default -from watcher.applier.messaging import event_types -from watcher.common.messaging.events import event from watcher import objects LOG = log.getLogger(__name__) @@ -34,32 +32,20 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler): self.service = service self.action_plan_uuid = action_plan_uuid - def notify(self, uuid, event_type, state): + def update_action_plan(self, uuid, state): action_plan = objects.ActionPlan.get_by_uuid(self.ctx, uuid) action_plan.state = state action_plan.save() - ev = event.Event() - ev.type = event_type - ev.data = {} - payload = {'action_plan__uuid': uuid, - 'action_plan_state': state} - self.service.publish_status_event(ev.type.name, payload) def execute(self): try: - # update state - self.notify(self.action_plan_uuid, - event_types.EventTypes.LAUNCH_ACTION_PLAN, - objects.action_plan.State.ONGOING) + self.update_action_plan(self.action_plan_uuid, + objects.action_plan.State.ONGOING) applier = default.DefaultApplier(self.ctx, self.service) applier.execute(self.action_plan_uuid) state = objects.action_plan.State.SUCCEEDED except Exception as e: LOG.exception(e) state = objects.action_plan.State.FAILED - finally: - # update state - self.notify(self.action_plan_uuid, - event_types.EventTypes.LAUNCH_ACTION_PLAN, - state) + self.update_action_plan(self.action_plan_uuid, state) diff --git a/watcher/applier/messaging/event_types.py b/watcher/applier/messaging/event_types.py deleted file mode 100644 index d6b916964..000000000 --- a/watcher/applier/messaging/event_types.py +++ /dev/null @@ -1,25 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import enum - - -class EventTypes(enum.Enum): - LAUNCH_ACTION_PLAN = "launch_action_plan" - LAUNCH_ACTION = "launch_action" diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index 1c614183c..2cc4beadf 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -21,10 +21,8 @@ import abc import six from watcher.applier.actions import factory -from watcher.applier.messaging import event_types from watcher.common import clients from watcher.common.loader import loadable -from watcher.common.messaging.events import event from watcher import objects @@ -77,12 +75,7 @@ class BaseWorkFlowEngine(loadable.Loadable): db_action = objects.Action.get_by_uuid(self.context, action.uuid) db_action.state = state db_action.save() - ev = event.Event() - ev.type = event_types.EventTypes.LAUNCH_ACTION - ev.data = {} - payload = {'action_uuid': action.uuid, - 'action_state': state} - self.applier_manager.publish_status_event(ev.type.name, payload) + # NOTE(v-francoise): Implement notifications for action @abc.abstractmethod def execute(self, actions): diff --git a/watcher/common/messaging/__init__.py b/watcher/common/messaging/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/common/messaging/events/__init__.py b/watcher/common/messaging/events/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/common/messaging/events/event.py b/watcher/common/messaging/events/event.py deleted file mode 100644 index 8c58112b8..000000000 --- a/watcher/common/messaging/events/event.py +++ /dev/null @@ -1,54 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class Event(object): - """Generic event to use with EventDispatcher""" - - def __init__(self, event_type=None, data=None, request_id=None): - """Default constructor - - :param event_type: the type of the event - :param data: a dictionary which contains data - :param request_id: a string which represent the uuid of the request - """ - self._type = event_type - self._data = data - self._request_id = request_id - - @property - def type(self): - return self._type - - @type.setter - def type(self, type): - self._type = type - - @property - def data(self): - return self._data - - @data.setter - def data(self, data): - self._data = data - - @property - def request_id(self): - return self._request_id - - @request_id.setter - def request_id(self, id): - self._request_id = id diff --git a/watcher/common/messaging/events/event_dispatcher.py b/watcher/common/messaging/events/event_dispatcher.py deleted file mode 100644 index 8382a81ec..000000000 --- a/watcher/common/messaging/events/event_dispatcher.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_log import log - -from watcher.decision_engine.messaging import events as messaging_events - -LOG = log.getLogger(__name__) - - -class EventDispatcher(object): - """Generic event dispatcher which listen and dispatch events""" - - def __init__(self): - self._events = dict() - - def __del__(self): - self._events = None - - def has_listener(self, event_type, listener): - """Return true if listener is register to event_type """ - # Check for event type and for the listener - if event_type in self._events.keys(): - return listener in self._events[event_type] - else: - return False - - def dispatch_event(self, event): - LOG.debug("dispatch evt : %s" % str(event.type)) - """ - Dispatch an instance of Event class - """ - if messaging_events.Events.ALL in self._events.keys(): - listeners = self._events[messaging_events.Events.ALL] - for listener in listeners: - listener(event) - - # Dispatch the event to all the associated listeners - if event.type in self._events.keys(): - listeners = self._events[event.type] - for listener in listeners: - listener(event) - - def add_event_listener(self, event_type, listener): - """Add an event listener for an event type""" - # Add listener to the event type - if not self.has_listener(event_type, listener): - listeners = self._events.get(event_type, []) - listeners.append(listener) - self._events[event_type] = listeners - - def remove_event_listener(self, event_type, listener): - """Remove event listener. """ - # Remove the listener from the event type - if self.has_listener(event_type, listener): - listeners = self._events[event_type] - - if len(listeners) == 1: - # Only this listener remains so remove the key - del self._events[event_type] - - else: - # Update listeners chain - listeners.remove(listener) - self._events[event_type] = listeners diff --git a/watcher/common/messaging/messaging_handler.py b/watcher/common/messaging/messaging_handler.py deleted file mode 100644 index 59093ca6c..000000000 --- a/watcher/common/messaging/messaging_handler.py +++ /dev/null @@ -1,120 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import socket -import threading - -import eventlet -from oslo_config import cfg -from oslo_log import log -import oslo_messaging as om - -from watcher.common import rpc -from watcher._i18n import _LE, _LW - -# NOTE: -# Ubuntu 14.04 forces librabbitmq when kombu is used -# Unfortunately it forces a version that has a crash -# bug. Calling eventlet.monkey_patch() tells kombu -# to use libamqp instead. -eventlet.monkey_patch() - -LOG = log.getLogger(__name__) -CONF = cfg.CONF - - -class MessagingHandler(threading.Thread): - - def __init__(self, publisher_id, topic_name, endpoints, version, - serializer=None): - super(MessagingHandler, self).__init__() - self.publisher_id = publisher_id - self.topic_name = topic_name - self.__endpoints = [] - self.__serializer = serializer - self.__version = version - - self.__server = None - self.__notifier = None - self.__transport = None - self.add_endpoints(endpoints) - - def add_endpoints(self, endpoints): - self.__endpoints.extend(endpoints) - - def remove_endpoint(self, endpoint): - if endpoint in self.__endpoints: - self.__endpoints.remove(endpoint) - - @property - def endpoints(self): - return self.__endpoints - - @property - def transport(self): - return self.__transport - - def build_notifier(self): - serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) - return om.Notifier( - self.__transport, - publisher_id=self.publisher_id, - topic=self.topic_name, - serializer=serializer - ) - - def build_server(self, target): - return om.get_rpc_server(self.__transport, target, - self.__endpoints, - executor='eventlet', - serializer=self.__serializer) - - def _configure(self): - try: - self.__transport = om.get_transport(CONF) - self.__notifier = self.build_notifier() - if len(self.__endpoints): - target = om.Target( - topic=self.topic_name, - # For compatibility, we can override it with 'host' opt - server=CONF.host or socket.getfqdn(), - version=self.__version, - ) - self.__server = self.build_server(target) - else: - LOG.warning( - _LW("No endpoint defined; can only publish events")) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("Messaging configuration error")) - - def run(self): - LOG.debug("configure MessagingHandler for %s" % self.topic_name) - self._configure() - if len(self.__endpoints) > 0: - LOG.debug("Starting up server") - self.__server.start() - - def stop(self): - LOG.debug('Stopped server') - self.__server.stop() - - def publish_event(self, event_type, payload, request_id=None): - self.__notifier.info( - {'version_api': self.__version, - 'request_id': request_id}, - {'event_id': event_type}, payload - ) diff --git a/watcher/common/messaging/notification_handler.py b/watcher/common/messaging/notification_handler.py deleted file mode 100644 index 4c67ab1a1..000000000 --- a/watcher/common/messaging/notification_handler.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import eventlet -import oslo_messaging as messaging - -from watcher.common.messaging.utils import observable - - -eventlet.monkey_patch() - - -class NotificationHandler(observable.Observable): - def __init__(self, publisher_id): - super(NotificationHandler, self).__init__() - self.publisher_id = publisher_id - - def info(self, ctx, publisher_id, event_type, payload, metadata): - if publisher_id == self.publisher_id: - self.set_changed() - self.notify(ctx, publisher_id, event_type, metadata, payload) - return messaging.NotificationResult.HANDLED - - def warn(self, ctx, publisher_id, event_type, payload, metadata): - if publisher_id == self.publisher_id: - self.set_changed() - self.notify(ctx, publisher_id, event_type, metadata, payload) - return messaging.NotificationResult.HANDLED - - def error(self, ctx, publisher_id, event_type, payload, metadata): - if publisher_id == self.publisher_id: - self.set_changed() - self.notify(ctx, publisher_id, event_type, metadata, payload) - return messaging.NotificationResult.HANDLED diff --git a/watcher/common/messaging/utils/__init__.py b/watcher/common/messaging/utils/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/common/messaging/utils/observable.py b/watcher/common/observable.py similarity index 100% rename from watcher/common/messaging/utils/observable.py rename to watcher/common/observable.py diff --git a/watcher/common/service.py b/watcher/common/service.py index a3bab67a5..13c8194b1 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -17,6 +17,7 @@ import datetime import socket +import eventlet from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import _options @@ -27,12 +28,10 @@ from oslo_reports import opts as gmr_opts from oslo_service import service from oslo_service import wsgi -from watcher._i18n import _, _LI +from watcher._i18n import _ from watcher.api import app from watcher.common import config from watcher.common import context -from watcher.common.messaging.events import event_dispatcher as dispatcher -from watcher.common.messaging import messaging_handler from watcher.common import rpc from watcher.common import scheduling from watcher import objects @@ -40,6 +39,13 @@ from watcher.objects import base from watcher import opts from watcher import version +# NOTE: +# Ubuntu 14.04 forces librabbitmq when kombu is used +# Unfortunately it forces a version that has a crash +# bug. Calling eventlet.monkey_patch() tells kombu +# to use libamqp instead. +eventlet.monkey_patch() + service_opts = [ cfg.IntOpt('periodic_interval', default=60, @@ -153,7 +159,7 @@ class ServiceHeartbeat(scheduling.BackgroundSchedulerService): """ -class Service(service.ServiceBase, dispatcher.EventDispatcher): +class Service(service.ServiceBase): API_VERSION = '1.0' @@ -248,9 +254,16 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.status_client = c def build_topic_handler(self, topic_name, endpoints=()): - return messaging_handler.MessagingHandler( - self.publisher_id, topic_name, [self.manager] + list(endpoints), - self.api_version, self.serializer) + serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) + target = om.Target( + topic=topic_name, + # For compatibility, we can override it with 'host' opt + server=CONF.host or socket.getfqdn(), + version=self.api_version, + ) + return om.get_rpc_server( + self.transport, target, endpoints, + executor='eventlet', serializer=serializer) def build_notification_handler(self, topic_names, endpoints=()): serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) @@ -290,34 +303,11 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): def wait(self): """Wait for service to complete.""" - def publish_control(self, event, payload): - return self.conductor_topic_handler.publish_event(event, payload) - - def publish_status_event(self, event, payload, request_id=None): - if self.status_topic_handler: - return self.status_topic_handler.publish_event( - event, payload, request_id) - else: - LOG.info( - _LI("No status notifier declared: notification '%s' not sent"), - event) - - def get_version(self): - return self.api_version - - def check_api_version(self, context): + def check_api_version(self, ctx): api_manager_version = self.conductor_client.call( - context, 'check_api_version', - api_version=self.api_version) + ctx, 'check_api_version', api_version=self.api_version) return api_manager_version - def response(self, evt, ctx, message): - payload = { - 'request_id': ctx['request_id'], - 'msg': message - } - self.publish_status_event(evt, payload) - def launch(conf, service_, workers=1, restart_method='reload'): return service.launch(conf, service_, workers, restart_method) diff --git a/watcher/common/messaging/utils/synchronization.py b/watcher/common/synchronization.py similarity index 100% rename from watcher/common/messaging/utils/synchronization.py rename to watcher/common/synchronization.py diff --git a/watcher/decision_engine/audit/base.py b/watcher/decision_engine/audit/base.py index 7357cfaf5..3163feca5 100644 --- a/watcher/decision_engine/audit/base.py +++ b/watcher/decision_engine/audit/base.py @@ -22,8 +22,6 @@ import six from oslo_log import log -from watcher.common.messaging.events import event as watcher_event -from watcher.decision_engine.messaging import events as de_events from watcher.decision_engine.planner import manager as planner_manager from watcher.decision_engine.strategy.context import default as default_context from watcher import objects @@ -72,19 +70,11 @@ class AuditHandler(BaseAuditHandler): def strategy_context(self): return self._strategy_context - def notify(self, audit_uuid, event_type, status): - event = watcher_event.Event() - event.type = event_type - event.data = {} - payload = {'audit_uuid': audit_uuid, - 'audit_status': status} - self.messaging.publish_status_event(event.type.name, payload) - - def update_audit_state(self, audit, state): + @staticmethod + def update_audit_state(audit, state): LOG.debug("Update audit state: %s", state) audit.state = state audit.save() - self.notify(audit.uuid, de_events.Events.TRIGGER_AUDIT, state) def pre_execute(self, audit, request_context): LOG.debug("Trigger audit %s", audit.uuid) diff --git a/watcher/decision_engine/messaging/events.py b/watcher/decision_engine/messaging/events.py deleted file mode 100644 index 403837085..000000000 --- a/watcher/decision_engine/messaging/events.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import enum - - -class Events(enum.Enum): - ALL = '*', - ACTION_PLAN = "action_plan" - TRIGGER_AUDIT = "trigger_audit" diff --git a/watcher/decision_engine/model/mapping.py b/watcher/decision_engine/model/mapping.py index c349dfcf5..f370e6868 100644 --- a/watcher/decision_engine/model/mapping.py +++ b/watcher/decision_engine/model/mapping.py @@ -14,8 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading - +from oslo_concurrency import lockutils from oslo_log import log from watcher._i18n import _LW @@ -28,7 +27,6 @@ class Mapping(object): self.model = model self.compute_node_mapping = {} self.instance_mapping = {} - self.lock = threading.Lock() def map(self, node, instance): """Select the node where the instance is launched @@ -36,9 +34,7 @@ class Mapping(object): :param node: the node :param instance: the virtual machine or instance """ - try: - self.lock.acquire() - + with lockutils.lock(__name__): # init first if node.uuid not in self.compute_node_mapping.keys(): self.compute_node_mapping[node.uuid] = set() @@ -49,9 +45,6 @@ class Mapping(object): # map instance => node self.instance_mapping[instance.uuid] = node.uuid - finally: - self.lock.release() - def unmap(self, node, instance): """Remove the instance from the node @@ -65,8 +58,7 @@ class Mapping(object): :rtype : object """ - try: - self.lock.acquire() + with lockutils.lock(__name__): if str(node_uuid) in self.compute_node_mapping: self.compute_node_mapping[str(node_uuid)].remove( str(instance_uuid)) @@ -77,8 +69,6 @@ class Mapping(object): _LW("Trying to delete the instance %(instance)s but it " "was not found on node %(node)s") % {'instance': instance_uuid, 'node': node_uuid}) - finally: - self.lock.release() def get_mapping(self): return self.compute_node_mapping diff --git a/watcher/decision_engine/rpcapi.py b/watcher/decision_engine/rpcapi.py index 53a953fd9..58d289669 100644 --- a/watcher/decision_engine/rpcapi.py +++ b/watcher/decision_engine/rpcapi.py @@ -20,7 +20,6 @@ from oslo_config import cfg from watcher.common import exception -from watcher.common.messaging import notification_handler from watcher.common import service from watcher.common import utils from watcher.decision_engine import manager @@ -78,7 +77,7 @@ class DecisionEngineAPIManager(object): @property def status_endpoints(self): - return [notification_handler.NotificationHandler] + return [] @property def notification_endpoints(self): diff --git a/watcher/tests/applier/action_plan/test_default_action_handler.py b/watcher/tests/applier/action_plan/test_default_action_handler.py index d6219700a..a59e57a19 100644 --- a/watcher/tests/applier/action_plan/test_default_action_handler.py +++ b/watcher/tests/applier/action_plan/test_default_action_handler.py @@ -19,7 +19,6 @@ import mock from watcher.applier.action_plan import default -from watcher.applier.messaging import event_types as ev from watcher.objects import action_plan as ap_objects from watcher.tests.db import base from watcher.tests.objects import utils as obj_utils @@ -41,20 +40,3 @@ class TestDefaultActionPlanHandler(base.DbTestCase): action_plan = ap_objects.ActionPlan.get_by_uuid( self.context, self.action_plan.uuid) self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state) - - def test_trigger_audit_send_notification(self): - messaging = mock.MagicMock() - command = default.DefaultActionPlanHandler( - self.context, messaging, self.action_plan.uuid) - command.execute() - - call_on_going = mock.call(ev.EventTypes.LAUNCH_ACTION_PLAN.name, { - 'action_plan_state': ap_objects.State.ONGOING, - 'action_plan__uuid': self.action_plan.uuid}) - call_succeeded = mock.call(ev.EventTypes.LAUNCH_ACTION_PLAN.name, { - 'action_plan_state': ap_objects.State.SUCCEEDED, - 'action_plan__uuid': self.action_plan.uuid}) - - calls = [call_on_going, call_succeeded] - messaging.publish_status_event.assert_has_calls(calls) - self.assertEqual(2, messaging.publish_status_event.call_count) diff --git a/watcher/tests/applier/test_applier_manager.py b/watcher/tests/applier/test_applier_manager.py index 19532ad4e..9dae195df 100644 --- a/watcher/tests/applier/test_applier_manager.py +++ b/watcher/tests/applier/test_applier_manager.py @@ -19,8 +19,8 @@ import mock +import oslo_messaging as om from watcher.applier import manager as applier_manager -from watcher.common.messaging import messaging_handler from watcher.common import service from watcher.tests import base @@ -30,8 +30,8 @@ class TestApplierManager(base.TestCase): super(TestApplierManager, self).setUp() self.applier = service.Service(applier_manager.ApplierManager) - @mock.patch.object(messaging_handler.MessagingHandler, "stop") - @mock.patch.object(messaging_handler.MessagingHandler, "start") + @mock.patch.object(om.rpc.server.RPCServer, "stop") + @mock.patch.object(om.rpc.server.RPCServer, "start") def test_start(self, m_messaging_start, m_messaging_stop): self.applier.start() self.applier.stop() diff --git a/watcher/tests/applier/test_rpcapi.py b/watcher/tests/applier/test_rpcapi.py index 5980cd42b..306d85bad 100644 --- a/watcher/tests/applier/test_rpcapi.py +++ b/watcher/tests/applier/test_rpcapi.py @@ -27,14 +27,11 @@ from watcher.tests import base class TestApplierAPI(base.TestCase): - def setUp(self): - super(TestApplierAPI, self).setUp() api = rpcapi.ApplierAPI() - def test_get_version(self): - expected_version = self.api.API_VERSION - self.assertEqual(expected_version, self.api.get_version()) + def setUp(self): + super(TestApplierAPI, self).setUp() def test_get_api_version(self): with mock.patch.object(om.RPCClient, 'call') as mock_call: diff --git a/watcher/tests/common/messaging/__init__.py b/watcher/tests/common/messaging/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/tests/common/messaging/event/__init__.py b/watcher/tests/common/messaging/event/__init__.py deleted file mode 100644 index 0ec1612b3..000000000 --- a/watcher/tests/common/messaging/event/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'bcom' diff --git a/watcher/tests/common/messaging/event/test_event_dispatcher.py b/watcher/tests/common/messaging/event/test_event_dispatcher.py deleted file mode 100644 index 184bd6165..000000000 --- a/watcher/tests/common/messaging/event/test_event_dispatcher.py +++ /dev/null @@ -1,84 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import mock - -from mock import call -from watcher.common.messaging.events import event as messaging_event -from watcher.common.messaging.events import event_dispatcher -from watcher.decision_engine.messaging import events as messaging_events -from watcher.tests import base - - -class TestEventDispatcher(base.TestCase): - - def setUp(self): - super(TestEventDispatcher, self).setUp() - self.event_dispatcher = event_dispatcher.EventDispatcher() - - def fake_listener(self): - return mock.MagicMock() - - def fake_event(self, event_type): - event = messaging_event.Event() - event.type = event_type - return event - - def test_add_listener(self): - listener = self.fake_listener() - self.event_dispatcher.add_event_listener(messaging_events.Events.ALL, - listener) - - self.assertTrue(self.event_dispatcher.has_listener( - messaging_events.Events.ALL, listener)) - - def test_remove_listener(self): - listener = self.fake_listener() - self.event_dispatcher.add_event_listener(messaging_events.Events.ALL, - listener) - self.event_dispatcher.remove_event_listener( - messaging_events.Events.ALL, listener) - - self.assertFalse(self.event_dispatcher.has_listener( - messaging_events.Events.TRIGGER_AUDIT, listener)) - - def test_dispatch_event(self): - listener = self.fake_listener() - event = self.fake_event(messaging_events.Events.TRIGGER_AUDIT) - self.event_dispatcher.add_event_listener( - messaging_events.Events.TRIGGER_AUDIT, listener) - - self.event_dispatcher.dispatch_event(event) - listener.assert_has_calls(calls=[call(event)]) - - def test_dispatch_event_to_all_listener(self): - event = self.fake_event(messaging_events.Events.ACTION_PLAN) - listener_all = self.fake_listener() - listener_action_plan = self.fake_listener() - listener_trigger_audit = self.fake_listener() - - self.event_dispatcher.add_event_listener( - messaging_events.Events.ALL, listener_all) - self.event_dispatcher.add_event_listener( - messaging_events.Events.ACTION_PLAN, listener_action_plan) - - self.event_dispatcher.add_event_listener( - messaging_events.Events.TRIGGER_AUDIT, listener_trigger_audit) - - self.event_dispatcher.dispatch_event(event) - listener_all.assert_has_calls(calls=[call(event)]) - listener_action_plan.assert_has_calls(calls=[call(event)]) - listener_trigger_audit.assert_has_calls([]) diff --git a/watcher/tests/common/messaging/test_messaging_handler.py b/watcher/tests/common/messaging/test_messaging_handler.py deleted file mode 100644 index fef4fbceb..000000000 --- a/watcher/tests/common/messaging/test_messaging_handler.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import mock -from oslo_config import cfg -import oslo_messaging as messaging -from watcher.common.messaging import messaging_handler -from watcher.tests import base - -CONF = cfg.CONF - - -class TestMessagingHandler(base.TestCase): - - PUBLISHER_ID = 'TEST_API' - TOPIC_WATCHER = 'TEST_TOPIC_WATCHER' - ENDPOINT = 'http://fake-fqdn:1337' - VERSION = "1.0" - - def setUp(self): - super(TestMessagingHandler, self).setUp() - CONF.set_default('host', 'fake-fqdn') - - @mock.patch.object(messaging, "get_rpc_server") - @mock.patch.object(messaging, "Target") - def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server): - m_target = mock.Mock() - m_target_cls.return_value = m_target - handler = messaging_handler.MessagingHandler( - publisher_id=self.PUBLISHER_ID, - topic_name=self.TOPIC_WATCHER, - endpoints=[self.ENDPOINT], - version=self.VERSION, - serializer=None, - ) - - handler.run() - - m_target_cls.assert_called_once_with( - server="fake-fqdn", - topic="TEST_TOPIC_WATCHER", - version="1.0", - ) - m_get_rpc_server.assert_called_once_with( - handler.transport, - m_target, - [self.ENDPOINT], - executor='eventlet', - serializer=None, - ) - - def test_messaging_handler_remove_endpoint(self): - handler = messaging_handler.MessagingHandler( - publisher_id=self.PUBLISHER_ID, - topic_name=self.TOPIC_WATCHER, - endpoints=[self.ENDPOINT], - version=self.VERSION, - serializer=None, - ) - - self.assertEqual([self.ENDPOINT], handler.endpoints) - - handler.remove_endpoint(self.ENDPOINT) - - self.assertEqual([], handler.endpoints) diff --git a/watcher/tests/common/messaging/test_notification_handler.py b/watcher/tests/common/messaging/test_notification_handler.py deleted file mode 100644 index 8e26967b2..000000000 --- a/watcher/tests/common/messaging/test_notification_handler.py +++ /dev/null @@ -1,56 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import mock -import oslo_messaging as messaging -from watcher.common.messaging import notification_handler -from watcher.common.messaging.utils import observable -from watcher.tests import base - -PUBLISHER_ID = 'TEST_API' - - -class TestNotificationHandler(base.TestCase): - - def setUp(self): - super(TestNotificationHandler, self).setUp() - self.notification_handler = notification_handler.NotificationHandler( - PUBLISHER_ID) - - def _test_notify(self, level_to_call): - ctx = {} - publisher_id = PUBLISHER_ID - event_type = 'Test' - payload = {} - metadata = {} - - with mock.patch.object(observable.Observable, 'notify') as mock_call: - notification_result = level_to_call(ctx, publisher_id, event_type, - payload, metadata) - self.assertEqual(messaging.NotificationResult.HANDLED, - notification_result) - mock_call.assert_called_once_with(ctx, publisher_id, event_type, - metadata, payload) - - def test_notify_info(self): - self._test_notify(self.notification_handler.info) - - def test_notify_warn(self): - self._test_notify(self.notification_handler.warn) - - def test_notify_error(self): - self._test_notify(self.notification_handler.error) diff --git a/watcher/tests/common/test_service.py b/watcher/tests/common/test_service.py index 80b9bed74..d6f3473c5 100644 --- a/watcher/tests/common/test_service.py +++ b/watcher/tests/common/test_service.py @@ -19,7 +19,7 @@ import mock from oslo_config import cfg -from watcher.common.messaging import messaging_handler +import oslo_messaging as om from watcher.common import rpc from watcher.common import service from watcher import objects @@ -81,13 +81,13 @@ class TestService(base.TestCase): def setUp(self): super(TestService, self).setUp() - @mock.patch.object(messaging_handler, "MessagingHandler") + @mock.patch.object(om.rpc.server, "RPCServer") def test_start(self, m_handler): dummy_service = service.Service(DummyManager) dummy_service.start() self.assertEqual(2, m_handler.call_count) - @mock.patch.object(messaging_handler, "MessagingHandler") + @mock.patch.object(om.rpc.server, "RPCServer") def test_stop(self, m_handler): dummy_service = service.Service(DummyManager) dummy_service.stop() @@ -98,6 +98,8 @@ class TestService(base.TestCase): dummy_service = service.Service(DummyManager) handler = dummy_service.build_topic_handler(topic_name) self.assertIsNotNone(handler) + self.assertIsInstance(handler, om.rpc.server.RPCServer) + self.assertEqual("mytopic", handler._target.topic) def test_init_service(self): dummy_service = service.Service(DummyManager) @@ -105,58 +107,7 @@ class TestService(base.TestCase): rpc.RequestContextSerializer) self.assertIsInstance( dummy_service.conductor_topic_handler, - messaging_handler.MessagingHandler) + om.rpc.server.RPCServer) self.assertIsInstance( dummy_service.status_topic_handler, - messaging_handler.MessagingHandler) - - @mock.patch.object(messaging_handler, "MessagingHandler") - def test_publish_control(self, m_handler_cls): - m_handler = mock.Mock() - m_handler_cls.return_value = m_handler - payload = { - "name": "value", - } - event = "myevent" - dummy_service = service.Service(DummyManager) - dummy_service.publish_control(event, payload) - m_handler.publish_event.assert_called_once_with(event, payload) - - @mock.patch.object(messaging_handler, "MessagingHandler") - def test_publish_status_event(self, m_handler_cls): - m_handler = mock.Mock() - m_handler_cls.return_value = m_handler - payload = { - "name": "value", - } - event = "myevent" - dummy_service = service.Service(DummyManager) - dummy_service.publish_status_event(event, payload) - m_handler.publish_event.assert_called_once_with(event, payload, None) - - @mock.patch.object(service.Service, 'publish_status_event') - def test_response(self, mock_call): - event = "My event" - context = {'request_id': 12} - message = "My Message" - - dummy_service = service.Service(DummyManager) - dummy_service.response(event, context, message) - - expected_payload = { - 'request_id': context['request_id'], - 'msg': message - } - mock_call.assert_called_once_with(event, expected_payload) - - def test_messaging_build_topic_handler(self): - dummy_service = service.Service(DummyManager) - topic = dummy_service.build_topic_handler("conductor_topic") - - self.assertIsInstance(topic, messaging_handler.MessagingHandler) - self.assertEqual("pub_id", dummy_service.publisher_id) - self.assertEqual("pub_id", topic.publisher_id) - - self.assertEqual("conductor_topic", - dummy_service.conductor_topic_handler.topic_name) - self.assertEqual("conductor_topic", topic.topic_name) + om.rpc.server.RPCServer) diff --git a/watcher/tests/decision_engine/audit/test_audit_handlers.py b/watcher/tests/decision_engine/audit/test_audit_handlers.py index 36db3596d..63d7fb26f 100644 --- a/watcher/tests/decision_engine/audit/test_audit_handlers.py +++ b/watcher/tests/decision_engine/audit/test_audit_handlers.py @@ -21,7 +21,6 @@ import mock from watcher.decision_engine.audit import continuous from watcher.decision_engine.audit import oneshot -from watcher.decision_engine.messaging import events from watcher.decision_engine.model.collector import manager from watcher.objects import audit as audit_objects from watcher.tests.db import base @@ -57,25 +56,6 @@ class TestOneShotAuditHandler(base.DbTestCase): audit = audit_objects.Audit.get_by_uuid(self.context, self.audit.uuid) self.assertEqual(audit_objects.State.SUCCEEDED, audit.state) - @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") - def test_trigger_audit_send_notification(self, mock_collector): - messaging = mock.MagicMock() - mock_collector.return_value = faker.FakerModelCollector() - audit_handler = oneshot.OneShotAuditHandler(messaging) - audit_handler.execute(self.audit, self.context) - - call_on_going = mock.call(events.Events.TRIGGER_AUDIT.name, { - 'audit_status': audit_objects.State.ONGOING, - 'audit_uuid': self.audit.uuid}) - call_succeeded = mock.call(events.Events.TRIGGER_AUDIT.name, { - 'audit_status': audit_objects.State.SUCCEEDED, - 'audit_uuid': self.audit.uuid}) - - calls = [call_on_going, call_succeeded] - messaging.publish_status_event.assert_has_calls(calls) - self.assertEqual( - 2, messaging.publish_status_event.call_count) - class TestContinuousAuditHandler(base.DbTestCase): def setUp(self): diff --git a/watcher/tests/decision_engine/test_rpcapi.py b/watcher/tests/decision_engine/test_rpcapi.py index f62a92b85..e168cba48 100644 --- a/watcher/tests/decision_engine/test_rpcapi.py +++ b/watcher/tests/decision_engine/test_rpcapi.py @@ -29,10 +29,6 @@ class TestDecisionEngineAPI(base.TestCase): api = rpcapi.DecisionEngineAPI() - def test_get_version(self): - expected_version = self.api.API_VERSION - self.assertEqual(expected_version, self.api.get_version()) - def test_get_api_version(self): with mock.patch.object(om.RPCClient, 'call') as mock_call: expected_context = self.context