From e61f9b5e882d1a6393158a9e1022e8f9eb7a1e09 Mon Sep 17 00:00:00 2001 From: Erik Olof Gunnar Andersson Date: Tue, 19 Jan 2021 00:24:55 -0800 Subject: [PATCH] Use common rpc pattern for all services There is a commonly shared and proven rpc pattern used across most OpenStack services that is already implemented in watcher, but the functions are not used. This patch basically makes use of the existing rpc classes and removes some unnecessary code. Change-Id: I57424561e0675a836d10b712ef1579a334f72018 --- watcher/common/rpc.py | 38 ++++++++++++++------ watcher/common/service.py | 54 ++++++++++------------------ watcher/tests/common/test_service.py | 3 -- 3 files changed, 46 insertions(+), 49 deletions(-) diff --git a/watcher/common/rpc.py b/watcher/common/rpc.py index 49197a0ea..bdd30fb9a 100644 --- a/watcher/common/rpc.py +++ b/watcher/common/rpc.py @@ -121,22 +121,40 @@ class RequestContextSerializer(messaging.Serializer): def get_client(target, version_cap=None, serializer=None): assert TRANSPORT is not None serializer = RequestContextSerializer(serializer) - return messaging.RPCClient(TRANSPORT, - target, - version_cap=version_cap, - serializer=serializer) + return messaging.RPCClient( + TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer + ) def get_server(target, endpoints, serializer=None): assert TRANSPORT is not None access_policy = dispatcher.DefaultRPCAccessPolicy serializer = RequestContextSerializer(serializer) - return messaging.get_rpc_server(TRANSPORT, - target, - endpoints, - executor='eventlet', - serializer=serializer, - access_policy=access_policy) + return messaging.get_rpc_server( + TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer, + access_policy=access_policy + ) + + +def get_notification_listener(targets, endpoints, serializer=None, pool=None): + assert NOTIFICATION_TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return messaging.get_notification_listener( + NOTIFICATION_TRANSPORT, + targets, + endpoints, + allow_requeue=False, + executor='eventlet', + pool=pool, + serializer=serializer + ) def get_notifier(publisher_id): diff --git a/watcher/common/service.py b/watcher/common/service.py index 077ec4606..816b05533 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -21,14 +21,12 @@ from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import _options from oslo_log import log -import oslo_messaging as om +import oslo_messaging as messaging from oslo_reports import guru_meditation_report as gmr from oslo_reports import opts as gmr_opts from oslo_service import service from oslo_service import wsgi -from oslo_messaging.rpc import dispatcher - from watcher._i18n import _ from watcher.api import app from watcher.common import config @@ -183,11 +181,6 @@ class Service(service.ServiceBase): ] self.notification_endpoints = self.manager.notification_endpoints - self.serializer = rpc.RequestContextSerializer( - base.WatcherObjectSerializer()) - - self._transport = None - self._notification_transport = None self._conductor_client = None self.conductor_topic_handler = None @@ -201,27 +194,17 @@ class Service(service.ServiceBase): self.notification_topics, self.notification_endpoints ) - @property - def transport(self): - if self._transport is None: - self._transport = om.get_rpc_transport(CONF) - return self._transport - - @property - def notification_transport(self): - if self._notification_transport is None: - self._notification_transport = om.get_notification_transport(CONF) - return self._notification_transport - @property def conductor_client(self): if self._conductor_client is None: - target = om.Target( + target = messaging.Target( topic=self.conductor_topic, version=self.API_VERSION, ) - self._conductor_client = om.RPCClient( - self.transport, target, serializer=self.serializer) + self._conductor_client = rpc.get_client( + target, + serializer=base.WatcherObjectSerializer() + ) return self._conductor_client @conductor_client.setter @@ -229,21 +212,18 @@ class Service(service.ServiceBase): self.conductor_client = c def build_topic_handler(self, topic_name, endpoints=()): - access_policy = dispatcher.DefaultRPCAccessPolicy - serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) - target = om.Target( + target = messaging.Target( topic=topic_name, # For compatibility, we can override it with 'host' opt server=CONF.host or socket.gethostname(), version=self.api_version, ) - return om.get_rpc_server( - self.transport, target, endpoints, - executor='eventlet', serializer=serializer, - access_policy=access_policy) + return rpc.get_server( + target, endpoints, + serializer=rpc.JsonPayloadSerializer() + ) def build_notification_handler(self, topic_names, endpoints=()): - serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) targets = [] for topic in topic_names: kwargs = {} @@ -251,11 +231,13 @@ class Service(service.ServiceBase): exchange, topic = topic.split('.') kwargs['exchange'] = exchange kwargs['topic'] = topic - targets.append(om.Target(**kwargs)) - return om.get_notification_listener( - self.notification_transport, targets, endpoints, - executor='eventlet', serializer=serializer, - allow_requeue=False, pool=CONF.host) + targets.append(messaging.Target(**kwargs)) + + return rpc.get_notification_listener( + targets, endpoints, + serializer=rpc.JsonPayloadSerializer(), + pool=CONF.host + ) def start(self): LOG.debug("Connecting to '%s'", CONF.transport_url) diff --git a/watcher/tests/common/test_service.py b/watcher/tests/common/test_service.py index bce8353ca..151f3c4da 100644 --- a/watcher/tests/common/test_service.py +++ b/watcher/tests/common/test_service.py @@ -20,7 +20,6 @@ from unittest import mock from oslo_config import cfg import oslo_messaging as om -from watcher.common import rpc from watcher.common import service from watcher import objects from watcher.tests import base @@ -102,8 +101,6 @@ class TestService(base.TestCase): def test_init_service(self): dummy_service = service.Service(DummyManager) - self.assertIsInstance(dummy_service.serializer, - rpc.RequestContextSerializer) self.assertIsInstance( dummy_service.conductor_topic_handler, om.rpc.server.RPCServer)