Use common rpc pattern for all services
There is a commonly shared and proven rpc pattern used across most OpenStack services that is already implemented in watcher, but the functions are not used. This patch basically makes use of the existing rpc classes and removes some unnecessary code. Change-Id: I57424561e0675a836d10b712ef1579a334f72018
This commit is contained in:
@@ -121,22 +121,40 @@ class RequestContextSerializer(messaging.Serializer):
|
||||
def get_client(target, version_cap=None, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return messaging.RPCClient(TRANSPORT,
|
||||
target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer)
|
||||
return messaging.RPCClient(
|
||||
TRANSPORT,
|
||||
target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer
|
||||
)
|
||||
|
||||
|
||||
def get_server(target, endpoints, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return messaging.get_rpc_server(TRANSPORT,
|
||||
target,
|
||||
endpoints,
|
||||
executor='eventlet',
|
||||
serializer=serializer,
|
||||
access_policy=access_policy)
|
||||
return messaging.get_rpc_server(
|
||||
TRANSPORT,
|
||||
target,
|
||||
endpoints,
|
||||
executor='eventlet',
|
||||
serializer=serializer,
|
||||
access_policy=access_policy
|
||||
)
|
||||
|
||||
|
||||
def get_notification_listener(targets, endpoints, serializer=None, pool=None):
|
||||
assert NOTIFICATION_TRANSPORT is not None
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return messaging.get_notification_listener(
|
||||
NOTIFICATION_TRANSPORT,
|
||||
targets,
|
||||
endpoints,
|
||||
allow_requeue=False,
|
||||
executor='eventlet',
|
||||
pool=pool,
|
||||
serializer=serializer
|
||||
)
|
||||
|
||||
|
||||
def get_notifier(publisher_id):
|
||||
|
||||
@@ -21,14 +21,12 @@ from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import _options
|
||||
from oslo_log import log
|
||||
import oslo_messaging as om
|
||||
import oslo_messaging as messaging
|
||||
from oslo_reports import guru_meditation_report as gmr
|
||||
from oslo_reports import opts as gmr_opts
|
||||
from oslo_service import service
|
||||
from oslo_service import wsgi
|
||||
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
|
||||
from watcher._i18n import _
|
||||
from watcher.api import app
|
||||
from watcher.common import config
|
||||
@@ -183,11 +181,6 @@ class Service(service.ServiceBase):
|
||||
]
|
||||
self.notification_endpoints = self.manager.notification_endpoints
|
||||
|
||||
self.serializer = rpc.RequestContextSerializer(
|
||||
base.WatcherObjectSerializer())
|
||||
|
||||
self._transport = None
|
||||
self._notification_transport = None
|
||||
self._conductor_client = None
|
||||
|
||||
self.conductor_topic_handler = None
|
||||
@@ -201,27 +194,17 @@ class Service(service.ServiceBase):
|
||||
self.notification_topics, self.notification_endpoints
|
||||
)
|
||||
|
||||
@property
|
||||
def transport(self):
|
||||
if self._transport is None:
|
||||
self._transport = om.get_rpc_transport(CONF)
|
||||
return self._transport
|
||||
|
||||
@property
|
||||
def notification_transport(self):
|
||||
if self._notification_transport is None:
|
||||
self._notification_transport = om.get_notification_transport(CONF)
|
||||
return self._notification_transport
|
||||
|
||||
@property
|
||||
def conductor_client(self):
|
||||
if self._conductor_client is None:
|
||||
target = om.Target(
|
||||
target = messaging.Target(
|
||||
topic=self.conductor_topic,
|
||||
version=self.API_VERSION,
|
||||
)
|
||||
self._conductor_client = om.RPCClient(
|
||||
self.transport, target, serializer=self.serializer)
|
||||
self._conductor_client = rpc.get_client(
|
||||
target,
|
||||
serializer=base.WatcherObjectSerializer()
|
||||
)
|
||||
return self._conductor_client
|
||||
|
||||
@conductor_client.setter
|
||||
@@ -229,21 +212,18 @@ class Service(service.ServiceBase):
|
||||
self.conductor_client = c
|
||||
|
||||
def build_topic_handler(self, topic_name, endpoints=()):
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
|
||||
target = om.Target(
|
||||
target = messaging.Target(
|
||||
topic=topic_name,
|
||||
# For compatibility, we can override it with 'host' opt
|
||||
server=CONF.host or socket.gethostname(),
|
||||
version=self.api_version,
|
||||
)
|
||||
return om.get_rpc_server(
|
||||
self.transport, target, endpoints,
|
||||
executor='eventlet', serializer=serializer,
|
||||
access_policy=access_policy)
|
||||
return rpc.get_server(
|
||||
target, endpoints,
|
||||
serializer=rpc.JsonPayloadSerializer()
|
||||
)
|
||||
|
||||
def build_notification_handler(self, topic_names, endpoints=()):
|
||||
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
|
||||
targets = []
|
||||
for topic in topic_names:
|
||||
kwargs = {}
|
||||
@@ -251,11 +231,13 @@ class Service(service.ServiceBase):
|
||||
exchange, topic = topic.split('.')
|
||||
kwargs['exchange'] = exchange
|
||||
kwargs['topic'] = topic
|
||||
targets.append(om.Target(**kwargs))
|
||||
return om.get_notification_listener(
|
||||
self.notification_transport, targets, endpoints,
|
||||
executor='eventlet', serializer=serializer,
|
||||
allow_requeue=False, pool=CONF.host)
|
||||
targets.append(messaging.Target(**kwargs))
|
||||
|
||||
return rpc.get_notification_listener(
|
||||
targets, endpoints,
|
||||
serializer=rpc.JsonPayloadSerializer(),
|
||||
pool=CONF.host
|
||||
)
|
||||
|
||||
def start(self):
|
||||
LOG.debug("Connecting to '%s'", CONF.transport_url)
|
||||
|
||||
@@ -20,7 +20,6 @@ from unittest import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
import oslo_messaging as om
|
||||
from watcher.common import rpc
|
||||
from watcher.common import service
|
||||
from watcher import objects
|
||||
from watcher.tests import base
|
||||
@@ -102,8 +101,6 @@ class TestService(base.TestCase):
|
||||
|
||||
def test_init_service(self):
|
||||
dummy_service = service.Service(DummyManager)
|
||||
self.assertIsInstance(dummy_service.serializer,
|
||||
rpc.RequestContextSerializer)
|
||||
self.assertIsInstance(
|
||||
dummy_service.conductor_topic_handler,
|
||||
om.rpc.server.RPCServer)
|
||||
|
||||
Reference in New Issue
Block a user