diff --git a/watcher/common/rpc.py b/watcher/common/rpc.py index 49197a0ea..bdd30fb9a 100644 --- a/watcher/common/rpc.py +++ b/watcher/common/rpc.py @@ -121,22 +121,40 @@ class RequestContextSerializer(messaging.Serializer): def get_client(target, version_cap=None, serializer=None): assert TRANSPORT is not None serializer = RequestContextSerializer(serializer) - return messaging.RPCClient(TRANSPORT, - target, - version_cap=version_cap, - serializer=serializer) + return messaging.RPCClient( + TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer + ) def get_server(target, endpoints, serializer=None): assert TRANSPORT is not None access_policy = dispatcher.DefaultRPCAccessPolicy serializer = RequestContextSerializer(serializer) - return messaging.get_rpc_server(TRANSPORT, - target, - endpoints, - executor='eventlet', - serializer=serializer, - access_policy=access_policy) + return messaging.get_rpc_server( + TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer, + access_policy=access_policy + ) + + +def get_notification_listener(targets, endpoints, serializer=None, pool=None): + assert NOTIFICATION_TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.get_notification_listener( + NOTIFICATION_TRANSPORT, + targets, + endpoints, + allow_requeue=False, + executor='eventlet', + pool=pool, + serializer=serializer + ) def get_notifier(publisher_id): diff --git a/watcher/common/service.py b/watcher/common/service.py index 077ec4606..816b05533 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -21,14 +21,12 @@ from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import _options from oslo_log import log -import oslo_messaging as om +import oslo_messaging as messaging from oslo_reports import guru_meditation_report as gmr from oslo_reports import opts as gmr_opts from oslo_service import service from oslo_service import wsgi -from oslo_messaging.rpc import dispatcher - from watcher._i18n import _ from watcher.api import app from watcher.common import config @@ -183,11 +181,6 @@ class Service(service.ServiceBase): ] self.notification_endpoints = self.manager.notification_endpoints - self.serializer = rpc.RequestContextSerializer( - base.WatcherObjectSerializer()) - - self._transport = None - self._notification_transport = None self._conductor_client = None self.conductor_topic_handler = None @@ -201,27 +194,17 @@ class Service(service.ServiceBase): self.notification_topics, self.notification_endpoints ) - @property - def transport(self): - if self._transport is None: - self._transport = om.get_rpc_transport(CONF) - return self._transport - - @property - def notification_transport(self): - if self._notification_transport is None: - self._notification_transport = om.get_notification_transport(CONF) - return self._notification_transport - @property def conductor_client(self): if self._conductor_client is None: - target = om.Target( + target = messaging.Target( topic=self.conductor_topic, version=self.API_VERSION, ) - self._conductor_client = om.RPCClient( - self.transport, target, serializer=self.serializer) + self._conductor_client = rpc.get_client( + target, + serializer=base.WatcherObjectSerializer() + ) return self._conductor_client @conductor_client.setter @@ -229,21 +212,18 @@ class Service(service.ServiceBase): self.conductor_client = c def build_topic_handler(self, topic_name, endpoints=()): - access_policy = dispatcher.DefaultRPCAccessPolicy - serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) - target = om.Target( + target = messaging.Target( topic=topic_name, # For compatibility, we can override it with 'host' opt server=CONF.host or socket.gethostname(), version=self.api_version, ) - return om.get_rpc_server( - self.transport, target, endpoints, - executor='eventlet', serializer=serializer, - access_policy=access_policy) + return rpc.get_server( + target, endpoints, + serializer=rpc.JsonPayloadSerializer() + ) def build_notification_handler(self, topic_names, endpoints=()): - serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) targets = [] for topic in topic_names: kwargs = {} @@ -251,11 +231,13 @@ class Service(service.ServiceBase): exchange, topic = topic.split('.') kwargs['exchange'] = exchange kwargs['topic'] = topic - targets.append(om.Target(**kwargs)) - return om.get_notification_listener( - self.notification_transport, targets, endpoints, - executor='eventlet', serializer=serializer, - allow_requeue=False, pool=CONF.host) + targets.append(messaging.Target(**kwargs)) + + return rpc.get_notification_listener( + targets, endpoints, + serializer=rpc.JsonPayloadSerializer(), + pool=CONF.host + ) def start(self): LOG.debug("Connecting to '%s'", CONF.transport_url) diff --git a/watcher/tests/common/test_service.py b/watcher/tests/common/test_service.py index bce8353ca..151f3c4da 100644 --- a/watcher/tests/common/test_service.py +++ b/watcher/tests/common/test_service.py @@ -20,7 +20,6 @@ from unittest import mock from oslo_config import cfg import oslo_messaging as om -from watcher.common import rpc from watcher.common import service from watcher import objects from watcher.tests import base @@ -102,8 +101,6 @@ class TestService(base.TestCase): def test_init_service(self): dummy_service = service.Service(DummyManager) - self.assertIsInstance(dummy_service.serializer, - rpc.RequestContextSerializer) self.assertIsInstance( dummy_service.conductor_topic_handler, om.rpc.server.RPCServer)