diff --git a/watcher/applier/manager.py b/watcher/applier/manager.py index 5661d9343..82b9be179 100644 --- a/watcher/applier/manager.py +++ b/watcher/applier/manager.py @@ -21,7 +21,6 @@ from oslo_config import cfg from oslo_log import log from watcher.applier.messaging import trigger -from watcher.common.messaging import messaging_core LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -63,17 +62,15 @@ CONF.register_group(opt_group) CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group) -class ApplierManager(messaging_core.MessagingCore): - def __init__(self): - super(ApplierManager, self).__init__( - CONF.watcher_applier.publisher_id, - CONF.watcher_applier.conductor_topic, - CONF.watcher_applier.status_topic, - api_version=self.API_VERSION, - ) - self.conductor_topic_handler.add_endpoint( - trigger.TriggerActionPlan(self)) +class ApplierManager(object): - def join(self): - self.conductor_topic_handler.join() - self.status_topic_handler.join() + API_VERSION = '1.0' + + conductor_endpoints = [trigger.TriggerActionPlan] + status_endpoints = [] + + def __init__(self): + self.publisher_id = CONF.watcher_applier.publisher_id + self.conductor_topic = CONF.watcher_applier.conductor_topic + self.status_topic = CONF.watcher_applier.status_topic + self.api_version = self.API_VERSION diff --git a/watcher/applier/rpcapi.py b/watcher/applier/rpcapi.py index 607d3882e..9eac24de1 100644 --- a/watcher/applier/rpcapi.py +++ b/watcher/applier/rpcapi.py @@ -18,48 +18,43 @@ # from oslo_config import cfg from oslo_log import log -import oslo_messaging as om -from watcher.applier.manager import APPLIER_MANAGER_OPTS -from watcher.applier.manager import opt_group +from watcher.applier import manager from watcher.common import exception -from watcher.common.messaging import messaging_core from watcher.common.messaging import notification_handler as notification +from watcher.common import service from watcher.common import utils LOG = log.getLogger(__name__) CONF = cfg.CONF -CONF.register_group(opt_group) -CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group) +CONF.register_group(manager.opt_group) +CONF.register_opts(manager.APPLIER_MANAGER_OPTS, manager.opt_group) -class ApplierAPI(messaging_core.MessagingCore): +class ApplierAPI(service.Service): def __init__(self): - super(ApplierAPI, self).__init__( - CONF.watcher_applier.publisher_id, - CONF.watcher_applier.conductor_topic, - CONF.watcher_applier.status_topic, - api_version=self.API_VERSION, - ) - self.handler = notification.NotificationHandler(self.publisher_id) - self.handler.register_observer(self) - self.status_topic_handler.add_endpoint(self.handler) - transport = om.get_transport(CONF) - - target = om.Target( - topic=CONF.watcher_applier.conductor_topic, - version=self.API_VERSION, - ) - - self.client = om.RPCClient(transport, target, - serializer=self.serializer) + super(ApplierAPI, self).__init__(ApplierAPIManager) def launch_action_plan(self, context, action_plan_uuid=None): if not utils.is_uuid_like(action_plan_uuid): raise exception.InvalidUuidOrName(name=action_plan_uuid) - return self.client.call( + return self.conductor_client.call( context.to_dict(), 'launch_action_plan', action_plan_uuid=action_plan_uuid) + + +class ApplierAPIManager(object): + + API_VERSION = '1.0' + + conductor_endpoints = [] + status_endpoints = [notification.NotificationHandler] + + def __init__(self): + self.publisher_id = CONF.watcher_applier.publisher_id + self.conductor_topic = CONF.watcher_applier.conductor_topic + self.status_topic = CONF.watcher_applier.status_topic + self.api_version = self.API_VERSION diff --git a/watcher/cmd/applier.py b/watcher/cmd/applier.py index 0cff9c532..fd6eb2463 100644 --- a/watcher/cmd/applier.py +++ b/watcher/cmd/applier.py @@ -17,29 +17,29 @@ """Starter script for the Applier service.""" -import logging as std_logging import os import sys from oslo_config import cfg from oslo_log import log as logging +from oslo_reports import guru_meditation_report as gmr +from oslo_service import service -from watcher import _i18n +from watcher._i18n import _LI from watcher.applier import manager -from watcher.common import service +from watcher.common import service as watcher_service +from watcher import version LOG = logging.getLogger(__name__) CONF = cfg.CONF -_LI = _i18n._LI def main(): - service.prepare_service(sys.argv) + watcher_service.prepare_service(sys.argv) + gmr.TextGuruMeditation.setup_autorun(version) - LOG.info(_LI('Starting server in PID %s') % os.getpid()) - LOG.debug("Configuration:") - cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) + LOG.info(_LI('Starting Watcher Applier service in PID %s'), os.getpid()) - server = manager.ApplierManager() - server.connect() - server.join() + applier_service = watcher_service.Service(manager.ApplierManager) + launcher = service.launch(CONF, applier_service) + launcher.wait() diff --git a/watcher/cmd/decisionengine.py b/watcher/cmd/decisionengine.py index 63b70aa42..df4979248 100644 --- a/watcher/cmd/decisionengine.py +++ b/watcher/cmd/decisionengine.py @@ -17,30 +17,30 @@ """Starter script for the Decision Engine manager service.""" -import logging as std_logging import os import sys from oslo_config import cfg from oslo_log import log as logging +from oslo_reports import guru_meditation_report as gmr +from oslo_service import service -from watcher import _i18n -from watcher.common import service +from watcher._i18n import _LI +from watcher.common import service as watcher_service from watcher.decision_engine import manager - +from watcher import version LOG = logging.getLogger(__name__) CONF = cfg.CONF -_LI = _i18n._LI def main(): - service.prepare_service(sys.argv) + watcher_service.prepare_service(sys.argv) + gmr.TextGuruMeditation.setup_autorun(version) - LOG.info(_LI('Starting server in PID %s') % os.getpid()) - LOG.debug("Configuration:") - cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) + LOG.info(_LI('Starting Watcher Decision Engine service in PID %s'), + os.getpid()) - server = manager.DecisionEngineManager() - server.connect() - server.join() + de_service = watcher_service.Service(manager.DecisionEngineManager) + launcher = service.launch(CONF, de_service) + launcher.wait() diff --git a/watcher/common/messaging/messaging_core.py b/watcher/common/messaging/messaging_core.py deleted file mode 100644 index f1eb6b1f2..000000000 --- a/watcher/common/messaging/messaging_core.py +++ /dev/null @@ -1,122 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_config import cfg -from oslo_log import log -import oslo_messaging as om - -from watcher.common.messaging.events import event_dispatcher as dispatcher -from watcher.common.messaging import messaging_handler -from watcher.common import rpc - -from watcher.objects import base - -LOG = log.getLogger(__name__) -CONF = cfg.CONF - - -class MessagingCore(dispatcher.EventDispatcher): - - API_VERSION = '1.0' - - def __init__(self, publisher_id, conductor_topic, status_topic, - api_version=API_VERSION): - super(MessagingCore, self).__init__() - self.serializer = rpc.RequestContextSerializer( - base.WatcherObjectSerializer()) - self.publisher_id = publisher_id - self.api_version = api_version - - self.conductor_topic = conductor_topic - self.status_topic = status_topic - self.conductor_topic_handler = self.build_topic_handler( - conductor_topic) - self.status_topic_handler = self.build_topic_handler(status_topic) - - self._conductor_client = None - self._status_client = None - - @property - def conductor_client(self): - if self._conductor_client is None: - transport = om.get_transport(CONF) - target = om.Target( - topic=self.conductor_topic, - version=self.API_VERSION, - ) - self._conductor_client = om.RPCClient( - transport, target, serializer=self.serializer) - return self._conductor_client - - @conductor_client.setter - def conductor_client(self, c): - self.conductor_client = c - - @property - def status_client(self): - if self._status_client is None: - transport = om.get_transport(CONF) - target = om.Target( - topic=self.status_topic, - version=self.API_VERSION, - ) - self._status_client = om.RPCClient( - transport, target, serializer=self.serializer) - return self._status_client - - @status_client.setter - def status_client(self, c): - self.status_client = c - - def build_topic_handler(self, topic_name): - return messaging_handler.MessagingHandler( - self.publisher_id, topic_name, self, - self.api_version, self.serializer) - - def connect(self): - LOG.debug("Connecting to '%s' (%s)", - CONF.transport_url, CONF.rpc_backend) - self.conductor_topic_handler.start() - self.status_topic_handler.start() - - def disconnect(self): - LOG.debug("Disconnecting from '%s' (%s)", - CONF.transport_url, CONF.rpc_backend) - self.conductor_topic_handler.stop() - self.status_topic_handler.stop() - - def publish_control(self, event, payload): - return self.conductor_topic_handler.publish_event(event, payload) - - def publish_status(self, event, payload, request_id=None): - return self.status_topic_handler.publish_event( - event, payload, request_id) - - def get_version(self): - return self.api_version - - def check_api_version(self, context): - api_manager_version = self.conductor_client.call( - context.to_dict(), 'check_api_version', - api_version=self.api_version) - return api_manager_version - - def response(self, evt, ctx, message): - payload = { - 'request_id': ctx['request_id'], - 'msg': message - } - self.publish_status(evt, payload) diff --git a/watcher/common/messaging/messaging_handler.py b/watcher/common/messaging/messaging_handler.py index eb18946ad..4404222c1 100644 --- a/watcher/common/messaging/messaging_handler.py +++ b/watcher/common/messaging/messaging_handler.py @@ -38,7 +38,7 @@ CONF = cfg.CONF class MessagingHandler(threading.Thread): - def __init__(self, publisher_id, topic_name, endpoint, version, + def __init__(self, publisher_id, topic_name, endpoints, version, serializer=None): super(MessagingHandler, self).__init__() self.publisher_id = publisher_id @@ -50,10 +50,10 @@ class MessagingHandler(threading.Thread): self.__server = None self.__notifier = None self.__transport = None - self.add_endpoint(endpoint) + self.add_endpoints(endpoints) - def add_endpoint(self, endpoint): - self.__endpoints.append(endpoint) + def add_endpoints(self, endpoints): + self.__endpoints.extend(endpoints) def remove_endpoint(self, endpoint): if endpoint in self.__endpoints: diff --git a/watcher/common/service.py b/watcher/common/service.py index 91da31dbd..1bd0bb06d 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -21,6 +21,7 @@ from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import _options from oslo_log import log +import oslo_messaging as om from oslo_reports import opts as gmr_opts from oslo_service import service from oslo_service import wsgi @@ -28,7 +29,10 @@ from oslo_service import wsgi from watcher._i18n import _ from watcher.api import app from watcher.common import config - +from watcher.common.messaging.events import event_dispatcher as dispatcher +from watcher.common.messaging import messaging_handler +from watcher.common import rpc +from watcher.objects import base service_opts = [ cfg.IntOpt('periodic_interval', @@ -92,6 +96,116 @@ class WSGIService(service.ServiceBase): self.server.reset() +class Service(service.ServiceBase, dispatcher.EventDispatcher): + + API_VERSION = '1.0' + + def __init__(self, manager_class): + super(Service, self).__init__() + self.manager = manager_class() + + self.publisher_id = self.manager.publisher_id + self.api_version = self.manager.API_VERSION + self.conductor_topic = self.manager.conductor_topic + self.status_topic = self.manager.status_topic + + self.conductor_endpoints = [ + ep(self) for ep in self.manager.conductor_endpoints + ] + self.status_endpoints = [ + ep(self.publisher_id) for ep in self.manager.status_endpoints + ] + + self.serializer = rpc.RequestContextSerializer( + base.WatcherObjectSerializer()) + + self.conductor_topic_handler = self.build_topic_handler( + self.conductor_topic, self.conductor_endpoints) + self.status_topic_handler = self.build_topic_handler( + self.status_topic, self.status_endpoints) + + self._conductor_client = None + self._status_client = None + + @property + def conductor_client(self): + if self._conductor_client is None: + transport = om.get_transport(CONF) + target = om.Target( + topic=self.conductor_topic, + version=self.API_VERSION, + ) + self._conductor_client = om.RPCClient( + transport, target, serializer=self.serializer) + return self._conductor_client + + @conductor_client.setter + def conductor_client(self, c): + self.conductor_client = c + + @property + def status_client(self): + if self._status_client is None: + transport = om.get_transport(CONF) + target = om.Target( + topic=self.status_topic, + version=self.API_VERSION, + ) + self._status_client = om.RPCClient( + transport, target, serializer=self.serializer) + return self._status_client + + @status_client.setter + def status_client(self, c): + self.status_client = c + + def build_topic_handler(self, topic_name, endpoints=()): + return messaging_handler.MessagingHandler( + self.publisher_id, topic_name, [self.manager] + list(endpoints), + self.api_version, self.serializer) + + def start(self): + LOG.debug("Connecting to '%s' (%s)", + CONF.transport_url, CONF.rpc_backend) + self.conductor_topic_handler.start() + self.status_topic_handler.start() + + def stop(self): + LOG.debug("Disconnecting from '%s' (%s)", + CONF.transport_url, CONF.rpc_backend) + self.conductor_topic_handler.stop() + self.status_topic_handler.stop() + + def reset(self): + """Reset a service in case it received a SIGHUP.""" + + def wait(self): + """Wait for service to complete.""" + + def publish_control(self, event, payload): + return self.conductor_topic_handler.publish_event(event, payload) + + def publish_status(self, event, payload, request_id=None): + return self.status_topic_handler.publish_event( + event, payload, request_id) + + def get_version(self): + return self.api_version + + def check_api_version(self, context): + api_manager_version = self.conductor_client.call( + context.to_dict(), 'check_api_version', + api_version=self.api_version) + return api_manager_version + + def response(self, evt, ctx, message): + payload = { + 'request_id': ctx['request_id'], + 'msg': message + } + self.publish_status(evt, payload) + + def process_launcher(conf=cfg.CONF): return service.ProcessLauncher(conf) diff --git a/watcher/decision_engine/manager.py b/watcher/decision_engine/manager.py index 56446e26d..009ddef5f 100644 --- a/watcher/decision_engine/manager.py +++ b/watcher/decision_engine/manager.py @@ -39,7 +39,6 @@ See :doc:`../architecture` for more details on this component. from oslo_config import cfg -from watcher.common.messaging import messaging_core from watcher.decision_engine.messaging import audit_endpoint @@ -76,18 +75,15 @@ CONF.register_group(decision_engine_opt_group) CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group) -class DecisionEngineManager(messaging_core.MessagingCore): - def __init__(self): - super(DecisionEngineManager, self).__init__( - CONF.watcher_decision_engine.publisher_id, - CONF.watcher_decision_engine.conductor_topic, - CONF.watcher_decision_engine.status_topic, - api_version=self.API_VERSION) - endpoint = audit_endpoint.AuditEndpoint( - self, - max_workers=CONF.watcher_decision_engine.max_workers) - self.conductor_topic_handler.add_endpoint(endpoint) +class DecisionEngineManager(object): - def join(self): - self.conductor_topic_handler.join() - self.status_topic_handler.join() + API_VERSION = '1.0' + + conductor_endpoints = [audit_endpoint.AuditEndpoint] + status_endpoints = [] + + def __init__(self): + self.publisher_id = CONF.watcher_decision_engine.publisher_id + self.conductor_topic = CONF.watcher_decision_engine.conductor_topic + self.status_topic = CONF.watcher_decision_engine.status_topic + self.api_version = self.API_VERSION diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index 18fe27e02..ce606b4e2 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -18,17 +18,20 @@ # from concurrent import futures +from oslo_config import cfg from oslo_log import log from watcher.decision_engine.audit import default +CONF = cfg.CONF LOG = log.getLogger(__name__) class AuditEndpoint(object): - def __init__(self, messaging, max_workers): + def __init__(self, messaging): self._messaging = messaging - self._executor = futures.ThreadPoolExecutor(max_workers=max_workers) + self._executor = futures.ThreadPoolExecutor( + max_workers=CONF.watcher_decision_engine.max_workers) @property def executor(self): diff --git a/watcher/decision_engine/rpcapi.py b/watcher/decision_engine/rpcapi.py index ddfa36103..6c56c2083 100644 --- a/watcher/decision_engine/rpcapi.py +++ b/watcher/decision_engine/rpcapi.py @@ -20,31 +20,23 @@ from oslo_config import cfg from watcher.common import exception -from watcher.common.messaging import messaging_core from watcher.common.messaging import notification_handler +from watcher.common import service from watcher.common import utils -from watcher.decision_engine.manager import decision_engine_opt_group -from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS +from watcher.decision_engine import manager CONF = cfg.CONF -CONF.register_group(decision_engine_opt_group) -CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group) +CONF.register_group(manager.decision_engine_opt_group) +CONF.register_opts(manager.WATCHER_DECISION_ENGINE_OPTS, + manager.decision_engine_opt_group) -class DecisionEngineAPI(messaging_core.MessagingCore): +class DecisionEngineAPI(service.Service): def __init__(self): - super(DecisionEngineAPI, self).__init__( - CONF.watcher_decision_engine.publisher_id, - CONF.watcher_decision_engine.conductor_topic, - CONF.watcher_decision_engine.status_topic, - api_version=self.API_VERSION, - ) - self.handler = notification_handler.NotificationHandler( - self.publisher_id) - self.status_topic_handler.add_endpoint(self.handler) + super(DecisionEngineAPI, self).__init__(DecisionEngineAPIManager) def trigger_audit(self, context, audit_uuid=None): if not utils.is_uuid_like(audit_uuid): @@ -52,3 +44,17 @@ class DecisionEngineAPI(messaging_core.MessagingCore): return self.conductor_client.call( context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid) + + +class DecisionEngineAPIManager(object): + + API_VERSION = '1.0' + + conductor_endpoints = [] + status_endpoints = [notification_handler.NotificationHandler] + + def __init__(self): + self.publisher_id = CONF.watcher_decision_engine.publisher_id + self.conductor_topic = CONF.watcher_decision_engine.conductor_topic + self.status_topic = CONF.watcher_decision_engine.status_topic + self.api_version = self.API_VERSION diff --git a/watcher/tests/applier/test_applier_manager.py b/watcher/tests/applier/test_applier_manager.py index a890486ca..1c6f8c566 100644 --- a/watcher/tests/applier/test_applier_manager.py +++ b/watcher/tests/applier/test_applier_manager.py @@ -18,22 +18,22 @@ # from mock import patch -from threading import Thread -from watcher.applier.manager import ApplierManager -from watcher.common.messaging.messaging_core import MessagingCore +from watcher.applier import manager as applier_manager +from watcher.common.messaging import messaging_handler +from watcher.common import service from watcher.tests import base class TestApplierManager(base.TestCase): def setUp(self): super(TestApplierManager, self).setUp() - self.applier = ApplierManager() + self.applier = service.Service(applier_manager.ApplierManager) - @patch.object(MessagingCore, "connect") - @patch.object(Thread, "join") - def test_connect(self, m_messaging, m_thread): - self.applier.connect() - self.applier.join() - self.assertEqual(2, m_messaging.call_count) - self.assertEqual(1, m_thread.call_count) + @patch.object(messaging_handler.MessagingHandler, "stop") + @patch.object(messaging_handler.MessagingHandler, "start") + def test_start(self, m_messaging_start, m_messaging_stop): + self.applier.start() + self.applier.stop() + self.assertEqual(2, m_messaging_start.call_count) + self.assertEqual(2, m_messaging_stop.call_count) diff --git a/watcher/tests/cmd/test_applier.py b/watcher/tests/cmd/test_applier.py index 205403f61..01eb8b9fc 100644 --- a/watcher/tests/cmd/test_applier.py +++ b/watcher/tests/cmd/test_applier.py @@ -19,9 +19,10 @@ from __future__ import unicode_literals import types -from mock import patch +import mock from oslo_config import cfg -from watcher.applier.manager import ApplierManager +from oslo_service import service + from watcher.cmd import applier from watcher.tests.base import BaseTestCase @@ -43,9 +44,7 @@ class TestApplier(BaseTestCase): super(TestApplier, self).tearDown() self.conf._parse_cli_opts = self._parse_cli_opts - @patch.object(ApplierManager, "connect") - @patch.object(ApplierManager, "join") - def test_run_applier_app(self, m_connect, m_join): + @mock.patch.object(service, "launch") + def test_run_applier_app(self, m_launch): applier.main() - self.assertEqual(1, m_connect.call_count) - self.assertEqual(1, m_join.call_count) + self.assertEqual(1, m_launch.call_count) diff --git a/watcher/tests/cmd/test_decision_engine.py b/watcher/tests/cmd/test_decision_engine.py index 54612bd1a..573be5596 100644 --- a/watcher/tests/cmd/test_decision_engine.py +++ b/watcher/tests/cmd/test_decision_engine.py @@ -19,15 +19,15 @@ from __future__ import unicode_literals import types -from mock import patch +import mock from oslo_config import cfg -from watcher.decision_engine.manager import DecisionEngineManager -from watcher.tests.base import BaseTestCase +from oslo_service import service from watcher.cmd import decisionengine +from watcher.tests import base -class TestDecisionEngine(BaseTestCase): +class TestDecisionEngine(base.BaseTestCase): def setUp(self): super(TestDecisionEngine, self).setUp() @@ -45,9 +45,7 @@ class TestDecisionEngine(BaseTestCase): super(TestDecisionEngine, self).tearDown() self.conf._parse_cli_opts = self._parse_cli_opts - @patch.object(DecisionEngineManager, "connect") - @patch.object(DecisionEngineManager, "join") - def test_run_de_app(self, m_connect, m_join): + @mock.patch.object(service, "launch") + def test_run_de_app(self, m_launch): decisionengine.main() - self.assertEqual(1, m_connect.call_count) - self.assertEqual(1, m_join.call_count) + self.assertEqual(1, m_launch.call_count) diff --git a/watcher/tests/common/messaging/test_messaging_handler.py b/watcher/tests/common/messaging/test_messaging_handler.py index 2dddeb13c..3e24a78af 100644 --- a/watcher/tests/common/messaging/test_messaging_handler.py +++ b/watcher/tests/common/messaging/test_messaging_handler.py @@ -42,7 +42,7 @@ class TestMessagingHandler(base.TestCase): handler = messaging_handler.MessagingHandler( publisher_id=self.PUBLISHER_ID, topic_name=self.TOPIC_WATCHER, - endpoint=self.ENDPOINT, + endpoints=[self.ENDPOINT], version=self.VERSION, serializer=None, ) @@ -65,7 +65,7 @@ class TestMessagingHandler(base.TestCase): handler = messaging_handler.MessagingHandler( publisher_id=self.PUBLISHER_ID, topic_name=self.TOPIC_WATCHER, - endpoint=self.ENDPOINT, + endpoints=[self.ENDPOINT], version=self.VERSION, serializer=None, ) diff --git a/watcher/tests/common/messaging/test_messaging_core.py b/watcher/tests/common/test_service.py similarity index 56% rename from watcher/tests/common/messaging/test_messaging_core.py rename to watcher/tests/common/test_service.py index 52e70eab1..5fe782a6d 100644 --- a/watcher/tests/common/messaging/test_messaging_core.py +++ b/watcher/tests/common/test_service.py @@ -17,44 +17,58 @@ import mock -from watcher.common.messaging import messaging_core from watcher.common.messaging import messaging_handler from watcher.common import rpc +from watcher.common import service from watcher.tests import base -class TestMessagingCore(base.TestCase): +class DummyManager(object): + + API_VERSION = '1.0' + + conductor_endpoints = [] + status_endpoints = [] + + def __init__(self): + self.publisher_id = "pub_id" + self.conductor_topic = "conductor_topic" + self.status_topic = "status_topic" + self.api_version = self.API_VERSION + + +class TestService(base.TestCase): def setUp(self): - super(TestMessagingCore, self).setUp() + super(TestService, self).setUp() @mock.patch.object(messaging_handler, "MessagingHandler") - def test_connect(self, m_handler): - messaging = messaging_core.MessagingCore("", "", "") - messaging.connect() + def test_start(self, m_handler): + dummy_service = service.Service(DummyManager) + dummy_service.start() self.assertEqual(2, m_handler.call_count) @mock.patch.object(messaging_handler, "MessagingHandler") - def test_disconnect(self, m_handler): - messaging = messaging_core.MessagingCore("", "", "") - messaging.disconnect() + def test_stop(self, m_handler): + dummy_service = service.Service(DummyManager) + dummy_service.stop() self.assertEqual(2, m_handler.call_count) def test_build_topic_handler(self): - topic_name = "MyTopic" - messaging = messaging_core.MessagingCore("", "", "") - handler = messaging.build_topic_handler(topic_name) + topic_name = "mytopic" + dummy_service = service.Service(DummyManager) + handler = dummy_service.build_topic_handler(topic_name) self.assertIsNotNone(handler) - def test_init_messaging_core(self): - messaging = messaging_core.MessagingCore("", "", "") - self.assertIsInstance(messaging.serializer, + def test_init_service(self): + dummy_service = service.Service(DummyManager) + self.assertIsInstance(dummy_service.serializer, rpc.RequestContextSerializer) self.assertIsInstance( - messaging.conductor_topic_handler, + dummy_service.conductor_topic_handler, messaging_handler.MessagingHandler) self.assertIsInstance( - messaging.status_topic_handler, + dummy_service.status_topic_handler, messaging_handler.MessagingHandler) @mock.patch.object(messaging_handler, "MessagingHandler") @@ -64,9 +78,9 @@ class TestMessagingCore(base.TestCase): payload = { "name": "value", } - event = "MyEvent" - messaging = messaging_core.MessagingCore("", "", "") - messaging.publish_control(event, payload) + event = "myevent" + dummy_service = service.Service(DummyManager) + dummy_service.publish_control(event, payload) m_handler.publish_event.assert_called_once_with(event, payload) @mock.patch.object(messaging_handler, "MessagingHandler") @@ -76,19 +90,19 @@ class TestMessagingCore(base.TestCase): payload = { "name": "value", } - event = "MyEvent" - messaging = messaging_core.MessagingCore("", "", "") - messaging.publish_status(event, payload) + event = "myevent" + dummy_service = service.Service(DummyManager) + dummy_service.publish_status(event, payload) m_handler.publish_event.assert_called_once_with(event, payload, None) - @mock.patch.object(messaging_core.MessagingCore, 'publish_status') + @mock.patch.object(service.Service, 'publish_status') def test_response(self, mock_call): event = "My event" context = {'request_id': 12} message = "My Message" - messaging = messaging_core.MessagingCore("", "", "") - messaging.response(event, context, message) + dummy_service = service.Service(DummyManager) + dummy_service.response(event, context, message) expected_payload = { 'request_id': context['request_id'], @@ -97,14 +111,13 @@ class TestMessagingCore(base.TestCase): mock_call.assert_called_once_with(event, expected_payload) def test_messaging_build_topic_handler(self): - messaging = messaging_core.MessagingCore( - "pub_id", "test_topic", "does not matter") - topic = messaging.build_topic_handler("test_topic") + dummy_service = service.Service(DummyManager) + topic = dummy_service.build_topic_handler("conductor_topic") self.assertIsInstance(topic, messaging_handler.MessagingHandler) - self.assertEqual("pub_id", messaging.publisher_id) + self.assertEqual("pub_id", dummy_service.publisher_id) self.assertEqual("pub_id", topic.publisher_id) - self.assertEqual("test_topic", - messaging.conductor_topic_handler.topic_name) - self.assertEqual("test_topic", topic.topic_name) + self.assertEqual("conductor_topic", + dummy_service.conductor_topic_handler.topic_name) + self.assertEqual("conductor_topic", topic.topic_name) diff --git a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py index 9288da920..289828a65 100644 --- a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py +++ b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py @@ -13,6 +13,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. + import mock from watcher.common import utils @@ -41,7 +42,7 @@ class TestAuditEndpoint(DbTestCase): audit_uuid = utils.generate_uuid() audit_handler = DefaultAuditHandler(mock.MagicMock()) - endpoint = AuditEndpoint(audit_handler, max_workers=2) + endpoint = AuditEndpoint(audit_handler) with mock.patch.object(DefaultAuditHandler, 'execute') as mock_call: mock_call.return_value = 0 @@ -54,7 +55,7 @@ class TestAuditEndpoint(DbTestCase): mock_collector.return_value = FakerModelCollector() audit_uuid = utils.generate_uuid() audit_handler = DefaultAuditHandler(mock.MagicMock()) - endpoint = AuditEndpoint(audit_handler, max_workers=2) + endpoint = AuditEndpoint(audit_handler) with mock.patch.object(DefaultAuditHandler, 'execute') \ as mock_call: