Removed 'watcher_messaging' to use oslo.messaging
The old 'watcher_messaging' section of the Watcher configuration file has now been replaced by the more standard oslo.configuration one. DocImpact Change-Id: Ie027df023e6133f3188e57b42846083f28c282bd
This commit is contained in:
@@ -14,7 +14,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from watcher.common.messaging.events.event_dispatcher import \
|
||||
@@ -28,60 +27,33 @@ from watcher.objects.base import WatcherObjectSerializer
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
WATCHER_MESSAGING_OPTS = [
|
||||
cfg.StrOpt('notifier_driver',
|
||||
default='messaging', help='The name of the driver used by'
|
||||
' oslo messaging'),
|
||||
cfg.StrOpt('executor',
|
||||
default='blocking', help='The name of a message executor, for'
|
||||
'example: eventlet, blocking'),
|
||||
cfg.StrOpt('protocol',
|
||||
default='rabbit', help='The protocol used by the message'
|
||||
' broker, for example rabbit'),
|
||||
cfg.StrOpt('user',
|
||||
default='guest', help='The username used by the message '
|
||||
'broker'),
|
||||
cfg.StrOpt('password',
|
||||
default='guest', help='The password of user used by the '
|
||||
'message broker'),
|
||||
cfg.StrOpt('host',
|
||||
default='localhost', help='The host where the message broker'
|
||||
'is installed'),
|
||||
cfg.StrOpt('port',
|
||||
default='5672', help='The port used bythe message broker'),
|
||||
cfg.StrOpt('virtual_host',
|
||||
default='', help='The virtual host used by the message '
|
||||
'broker')
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
opt_group = cfg.OptGroup(name='watcher_messaging',
|
||||
title='Options for the messaging core')
|
||||
CONF.register_group(opt_group)
|
||||
CONF.register_opts(WATCHER_MESSAGING_OPTS, opt_group)
|
||||
|
||||
|
||||
class MessagingCore(EventDispatcher):
|
||||
|
||||
API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, publisher_id, topic_control, topic_status):
|
||||
EventDispatcher.__init__(self)
|
||||
def __init__(self, publisher_id, topic_control, topic_status,
|
||||
api_version=API_VERSION):
|
||||
super(MessagingCore, self).__init__()
|
||||
self.serializer = RequestContextSerializer(WatcherObjectSerializer())
|
||||
self.publisher_id = publisher_id
|
||||
self.api_version = api_version
|
||||
self.topic_control = self.build_topic(topic_control)
|
||||
self.topic_status = self.build_topic(topic_status)
|
||||
|
||||
def build_topic(self, topic_name):
|
||||
return MessagingHandler(self.publisher_id, topic_name, self,
|
||||
self.API_VERSION, self.serializer)
|
||||
self.api_version, self.serializer)
|
||||
|
||||
def connect(self):
|
||||
LOG.debug("connecting to rabbitMQ broker")
|
||||
LOG.debug("Connecting to '%s' (%s)",
|
||||
CONF.transport_url, CONF.rpc_backend)
|
||||
self.topic_control.start()
|
||||
self.topic_status.start()
|
||||
|
||||
def disconnect(self):
|
||||
LOG.debug("Disconnect to rabbitMQ broker")
|
||||
LOG.debug("Disconnecting from '%s' (%s)",
|
||||
CONF.transport_url, CONF.rpc_backend)
|
||||
self.topic_control.stop()
|
||||
self.topic_status.stop()
|
||||
|
||||
@@ -92,12 +64,12 @@ class MessagingCore(EventDispatcher):
|
||||
return self.topic_status.publish_event(event, payload, request_id)
|
||||
|
||||
def get_version(self):
|
||||
return self.API_VERSION
|
||||
return self.api_version
|
||||
|
||||
def check_api_version(self, context):
|
||||
api_manager_version = self.client.call(
|
||||
context.to_dict(), 'check_api_version',
|
||||
api_version=self.API_VERSION)
|
||||
api_version=self.api_version)
|
||||
return api_manager_version
|
||||
|
||||
def response(self, evt, ctx, message):
|
||||
|
||||
@@ -14,35 +14,42 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import socket
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging as om
|
||||
from threading import Thread
|
||||
from watcher.common.messaging.utils.transport_url_builder import \
|
||||
TransportUrlBuilder
|
||||
from watcher.common.rpc import JsonPayloadSerializer
|
||||
from watcher.common.rpc import RequestContextSerializer
|
||||
|
||||
# NOTE:
|
||||
# Ubuntu 14.04 forces librabbitmq when kombu is used
|
||||
# Unfortunately it forces a version that has a crash
|
||||
# bug. Calling eventlet.monkey_patch() tells kombu
|
||||
# to use libamqp instead.
|
||||
eventlet.monkey_patch()
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class MessagingHandler(Thread):
|
||||
|
||||
def __init__(self, publisher_id, topic_watcher, endpoint, version,
|
||||
serializer=None):
|
||||
Thread.__init__(self)
|
||||
super(MessagingHandler, self).__init__()
|
||||
self.publisher_id = publisher_id
|
||||
self.topic_watcher = topic_watcher
|
||||
self.__endpoints = []
|
||||
self.__serializer = serializer
|
||||
self.__version = version
|
||||
|
||||
self.__server = None
|
||||
self.__notifier = None
|
||||
self.__endpoints = []
|
||||
self.__topics = []
|
||||
self._publisher_id = publisher_id
|
||||
self._topic_watcher = topic_watcher
|
||||
self.__endpoints.append(endpoint)
|
||||
self.__version = version
|
||||
self.__serializer = serializer
|
||||
self.__transport = None
|
||||
self.add_endpoint(endpoint)
|
||||
|
||||
def add_endpoint(self, endpoint):
|
||||
self.__endpoints.append(endpoint)
|
||||
@@ -51,47 +58,50 @@ class MessagingHandler(Thread):
|
||||
if endpoint in self.__endpoints:
|
||||
self.__endpoints.remove(endpoint)
|
||||
|
||||
@property
|
||||
def endpoints(self):
|
||||
return self.__endpoints
|
||||
|
||||
@property
|
||||
def transport(self):
|
||||
return self.__transport
|
||||
|
||||
def build_notifier(self):
|
||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||
return om.Notifier(
|
||||
self.transport,
|
||||
driver=CONF.watcher_messaging.notifier_driver,
|
||||
publisher_id=self._publisher_id,
|
||||
topic=self._topic_watcher,
|
||||
serializer=serializer)
|
||||
self.__transport,
|
||||
publisher_id=self.publisher_id,
|
||||
topic=self.topic_watcher,
|
||||
serializer=serializer
|
||||
)
|
||||
|
||||
def build_server(self, targets):
|
||||
|
||||
return om.get_rpc_server(self.transport, targets,
|
||||
def build_server(self, target):
|
||||
return om.get_rpc_server(self.__transport, target,
|
||||
self.__endpoints,
|
||||
executor=CONF.
|
||||
watcher_messaging.executor,
|
||||
serializer=self.__serializer)
|
||||
|
||||
def __build_transport_url(self):
|
||||
return TransportUrlBuilder().url
|
||||
|
||||
def __config(self):
|
||||
def _configure(self):
|
||||
try:
|
||||
self.transport = om.get_transport(
|
||||
cfg.CONF,
|
||||
url=self.__build_transport_url())
|
||||
self.__transport = om.get_transport(CONF)
|
||||
self.__notifier = self.build_notifier()
|
||||
if 0 < len(self.__endpoints):
|
||||
targets = om.Target(
|
||||
topic=self._topic_watcher,
|
||||
server=CONF.watcher_messaging.host,
|
||||
version=self.__version)
|
||||
self.__server = self.build_server(targets)
|
||||
if len(self.__endpoints):
|
||||
target = om.Target(
|
||||
topic=self.topic_watcher,
|
||||
# For compatibility, we can override it with 'host' opt
|
||||
server=CONF.host or socket.getfqdn(),
|
||||
version=self.__version,
|
||||
)
|
||||
self.__server = self.build_server(target)
|
||||
else:
|
||||
LOG.warn("you have no defined endpoint, \
|
||||
so you can only publish events")
|
||||
LOG.warn("you have no defined endpoint, "
|
||||
"so you can only publish events")
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.error("configure : %s" % str(e.message))
|
||||
|
||||
def run(self):
|
||||
LOG.debug("configure MessagingHandler for %s" % self._topic_watcher)
|
||||
self.__config()
|
||||
LOG.debug("configure MessagingHandler for %s" % self.topic_watcher)
|
||||
self._configure()
|
||||
if len(self.__endpoints) > 0:
|
||||
LOG.debug("Starting up server")
|
||||
self.__server.start()
|
||||
@@ -102,6 +112,8 @@ class MessagingHandler(Thread):
|
||||
self.__server.stop()
|
||||
|
||||
def publish_event(self, event_type, payload, request_id=None):
|
||||
self.__notifier.info({'version_api': self.__version,
|
||||
'request_id': request_id},
|
||||
{'event_id': event_type}, payload)
|
||||
self.__notifier.info(
|
||||
{'version_api': self.__version,
|
||||
'request_id': request_id},
|
||||
{'event_id': event_type}, payload
|
||||
)
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TransportUrlBuilder(object):
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return "%s://%s:%s@%s:%s/%s" % (
|
||||
CONF.watcher_messaging.protocol,
|
||||
CONF.watcher_messaging.user,
|
||||
CONF.watcher_messaging.password,
|
||||
CONF.watcher_messaging.host,
|
||||
CONF.watcher_messaging.port,
|
||||
CONF.watcher_messaging.virtual_host
|
||||
)
|
||||
@@ -1,107 +0,0 @@
|
||||
# Copyright 2014 - Rackspace Hosting
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Common RPC service and API tools for Watcher."""
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from watcher.common import context as watcher_context
|
||||
from watcher.common import rpc
|
||||
from watcher.objects import base as objects_base
|
||||
|
||||
|
||||
# NOTE(paulczar):
|
||||
# Ubuntu 14.04 forces librabbitmq when kombu is used
|
||||
# Unfortunately it forces a version that has a crash
|
||||
# bug. Calling eventlet.monkey_patch() tells kombu
|
||||
# to use libamqp instead.
|
||||
eventlet.monkey_patch()
|
||||
|
||||
# NOTE(asalkeld):
|
||||
# The watcher.openstack.common.rpc entries are for compatability
|
||||
# with devstack rpc_backend configuration values.
|
||||
TRANSPORT_ALIASES = {
|
||||
'watcher.openstack.common.rpc.impl_kombu': 'rabbit',
|
||||
'watcher.openstack.common.rpc.impl_qpid': 'qpid',
|
||||
'watcher.openstack.common.rpc.impl_zmq': 'zmq',
|
||||
}
|
||||
|
||||
|
||||
class RequestContextSerializer(messaging.Serializer):
|
||||
|
||||
def __init__(self, base):
|
||||
self._base = base
|
||||
|
||||
def serialize_entity(self, context, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.serialize_entity(context, entity)
|
||||
|
||||
def deserialize_entity(self, context, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.deserialize_entity(context, entity)
|
||||
|
||||
def serialize_context(self, context):
|
||||
return context.to_dict()
|
||||
|
||||
def deserialize_context(self, context):
|
||||
return watcher_context.RequestContext.from_dict(context)
|
||||
|
||||
|
||||
class Service(object):
|
||||
_server = None
|
||||
|
||||
def __init__(self, topic, server, handlers):
|
||||
serializer = RequestContextSerializer(
|
||||
objects_base.WatcherObjectSerializer())
|
||||
transport = messaging.get_transport(cfg.CONF,
|
||||
aliases=TRANSPORT_ALIASES)
|
||||
# TODO(asalkeld) add support for version='x.y'
|
||||
target = messaging.Target(topic=topic, server=server)
|
||||
self._server = messaging.get_rpc_server(transport, target, handlers,
|
||||
serializer=serializer)
|
||||
|
||||
def serve(self):
|
||||
self._server.start()
|
||||
self._server.wait()
|
||||
|
||||
|
||||
class API(object):
|
||||
def __init__(self, transport=None, context=None, topic=None):
|
||||
serializer = RequestContextSerializer(
|
||||
objects_base.WatcherObjectSerializer())
|
||||
if transport is None:
|
||||
exmods = rpc.get_allowed_exmods()
|
||||
transport = messaging.get_transport(cfg.CONF,
|
||||
allowed_remote_exmods=exmods,
|
||||
aliases=TRANSPORT_ALIASES)
|
||||
self._context = context
|
||||
if topic is None:
|
||||
topic = ''
|
||||
target = messaging.Target(topic=topic)
|
||||
self._client = messaging.RPCClient(transport, target,
|
||||
serializer=serializer)
|
||||
|
||||
def _call(self, method, *args, **kwargs):
|
||||
# import pdb; pdb.set_trace()
|
||||
return self._client.call(self._context, method, *args, **kwargs)
|
||||
|
||||
def _cast(self, method, *args, **kwargs):
|
||||
self._client.cast(self._context, method, *args, **kwargs)
|
||||
|
||||
def echo(self, message):
|
||||
self._cast('echo', message=message)
|
||||
Reference in New Issue
Block a user