Merge "Use common rpc pattern for all services"

This commit is contained in:
Zuul
2021-02-03 10:09:25 +00:00
committed by Gerrit Code Review
3 changed files with 46 additions and 49 deletions

View File

@@ -121,22 +121,40 @@ class RequestContextSerializer(messaging.Serializer):
def get_client(target, version_cap=None, serializer=None): def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer) serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT, return messaging.RPCClient(
target, TRANSPORT,
version_cap=version_cap, target,
serializer=serializer) version_cap=version_cap,
serializer=serializer
)
def get_server(target, endpoints, serializer=None): def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None assert TRANSPORT is not None
access_policy = dispatcher.DefaultRPCAccessPolicy access_policy = dispatcher.DefaultRPCAccessPolicy
serializer = RequestContextSerializer(serializer) serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT, return messaging.get_rpc_server(
target, TRANSPORT,
endpoints, target,
executor='eventlet', endpoints,
serializer=serializer, executor='eventlet',
access_policy=access_policy) serializer=serializer,
access_policy=access_policy
)
def get_notification_listener(targets, endpoints, serializer=None, pool=None):
assert NOTIFICATION_TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_notification_listener(
NOTIFICATION_TRANSPORT,
targets,
endpoints,
allow_requeue=False,
executor='eventlet',
pool=pool,
serializer=serializer
)
def get_notifier(publisher_id): def get_notifier(publisher_id):

View File

@@ -21,14 +21,12 @@ from oslo_concurrency import processutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import _options from oslo_log import _options
from oslo_log import log from oslo_log import log
import oslo_messaging as om import oslo_messaging as messaging
from oslo_reports import guru_meditation_report as gmr from oslo_reports import guru_meditation_report as gmr
from oslo_reports import opts as gmr_opts from oslo_reports import opts as gmr_opts
from oslo_service import service from oslo_service import service
from oslo_service import wsgi from oslo_service import wsgi
from oslo_messaging.rpc import dispatcher
from watcher._i18n import _ from watcher._i18n import _
from watcher.api import app from watcher.api import app
from watcher.common import config from watcher.common import config
@@ -183,11 +181,6 @@ class Service(service.ServiceBase):
] ]
self.notification_endpoints = self.manager.notification_endpoints self.notification_endpoints = self.manager.notification_endpoints
self.serializer = rpc.RequestContextSerializer(
base.WatcherObjectSerializer())
self._transport = None
self._notification_transport = None
self._conductor_client = None self._conductor_client = None
self.conductor_topic_handler = None self.conductor_topic_handler = None
@@ -201,27 +194,17 @@ class Service(service.ServiceBase):
self.notification_topics, self.notification_endpoints self.notification_topics, self.notification_endpoints
) )
@property
def transport(self):
if self._transport is None:
self._transport = om.get_rpc_transport(CONF)
return self._transport
@property
def notification_transport(self):
if self._notification_transport is None:
self._notification_transport = om.get_notification_transport(CONF)
return self._notification_transport
@property @property
def conductor_client(self): def conductor_client(self):
if self._conductor_client is None: if self._conductor_client is None:
target = om.Target( target = messaging.Target(
topic=self.conductor_topic, topic=self.conductor_topic,
version=self.API_VERSION, version=self.API_VERSION,
) )
self._conductor_client = om.RPCClient( self._conductor_client = rpc.get_client(
self.transport, target, serializer=self.serializer) target,
serializer=base.WatcherObjectSerializer()
)
return self._conductor_client return self._conductor_client
@conductor_client.setter @conductor_client.setter
@@ -229,21 +212,18 @@ class Service(service.ServiceBase):
self.conductor_client = c self.conductor_client = c
def build_topic_handler(self, topic_name, endpoints=()): def build_topic_handler(self, topic_name, endpoints=()):
access_policy = dispatcher.DefaultRPCAccessPolicy target = messaging.Target(
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
target = om.Target(
topic=topic_name, topic=topic_name,
# For compatibility, we can override it with 'host' opt # For compatibility, we can override it with 'host' opt
server=CONF.host or socket.gethostname(), server=CONF.host or socket.gethostname(),
version=self.api_version, version=self.api_version,
) )
return om.get_rpc_server( return rpc.get_server(
self.transport, target, endpoints, target, endpoints,
executor='eventlet', serializer=serializer, serializer=rpc.JsonPayloadSerializer()
access_policy=access_policy) )
def build_notification_handler(self, topic_names, endpoints=()): def build_notification_handler(self, topic_names, endpoints=()):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
targets = [] targets = []
for topic in topic_names: for topic in topic_names:
kwargs = {} kwargs = {}
@@ -251,11 +231,13 @@ class Service(service.ServiceBase):
exchange, topic = topic.split('.') exchange, topic = topic.split('.')
kwargs['exchange'] = exchange kwargs['exchange'] = exchange
kwargs['topic'] = topic kwargs['topic'] = topic
targets.append(om.Target(**kwargs)) targets.append(messaging.Target(**kwargs))
return om.get_notification_listener(
self.notification_transport, targets, endpoints, return rpc.get_notification_listener(
executor='eventlet', serializer=serializer, targets, endpoints,
allow_requeue=False, pool=CONF.host) serializer=rpc.JsonPayloadSerializer(),
pool=CONF.host
)
def start(self): def start(self):
LOG.debug("Connecting to '%s'", CONF.transport_url) LOG.debug("Connecting to '%s'", CONF.transport_url)

View File

@@ -20,7 +20,6 @@ from unittest import mock
from oslo_config import cfg from oslo_config import cfg
import oslo_messaging as om import oslo_messaging as om
from watcher.common import rpc
from watcher.common import service from watcher.common import service
from watcher import objects from watcher import objects
from watcher.tests import base from watcher.tests import base
@@ -102,8 +101,6 @@ class TestService(base.TestCase):
def test_init_service(self): def test_init_service(self):
dummy_service = service.Service(DummyManager) dummy_service = service.Service(DummyManager)
self.assertIsInstance(dummy_service.serializer,
rpc.RequestContextSerializer)
self.assertIsInstance( self.assertIsInstance(
dummy_service.conductor_topic_handler, dummy_service.conductor_topic_handler,
om.rpc.server.RPCServer) om.rpc.server.RPCServer)