From 8756c70060963aac6f73f514adff1cc604dfe74f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20Fran=C3=A7oise?= Date: Tue, 1 Dec 2015 12:02:45 +0100 Subject: [PATCH] Removed 'watcher_messaging' to use oslo.messaging The old 'watcher_messaging' section of the Watcher configuration file has now been replaced by the more standard oslo.configuration one. DocImpact Change-Id: Ie027df023e6133f3188e57b42846083f28c282bd --- etc/watcher/watcher.conf.sample | 423 ++++++++++++++++-- tox.ini | 1 + watcher/applier/framework/manager_applier.py | 10 +- watcher/applier/framework/rpcapi.py | 23 +- watcher/common/messaging/messaging_core.py | 52 +-- watcher/common/messaging/messaging_handler.py | 94 ++-- .../messaging/utils/transport_url_builder.py | 35 -- watcher/common/rpc_service.py | 107 ----- watcher/decision_engine/manager.py | 10 +- watcher/decision_engine/rpcapi.py | 24 +- watcher/opts.py | 4 - .../common/messaging/test_messaging_core.py | 95 ++-- .../messaging/test_messaging_handler.py | 78 ++++ .../tests/common/messaging/utils/__init__.py | 0 .../utils/test_transport_url_builder.py | 46 -- watcher/tests/conf_fixture.py | 1 - watcher/tests/decision_engine/test_rpcapi.py | 2 +- 17 files changed, 622 insertions(+), 383 deletions(-) delete mode 100644 watcher/common/messaging/utils/transport_url_builder.py delete mode 100644 watcher/common/rpc_service.py create mode 100644 watcher/tests/common/messaging/test_messaging_handler.py delete mode 100644 watcher/tests/common/messaging/utils/__init__.py delete mode 100644 watcher/tests/common/messaging/utils/test_transport_url_builder.py diff --git a/etc/watcher/watcher.conf.sample b/etc/watcher/watcher.conf.sample index e03c4681e..9f89ad5c8 100644 --- a/etc/watcher/watcher.conf.sample +++ b/etc/watcher/watcher.conf.sample @@ -111,6 +111,108 @@ # Enables or disables fatal status of deprecations. (boolean value) #fatal_deprecations = false +# +# From oslo.messaging +# + +# Size of RPC connection pool. (integer value) +# Deprecated group/name - [DEFAULT]/rpc_conn_pool_size +#rpc_conn_pool_size = 30 + +# ZeroMQ bind address. Should be a wildcard (*), an ethernet +# interface, or IP. The "host" option should point or resolve to this +# address. (string value) +#rpc_zmq_bind_address = * + +# MatchMaker driver. (string value) +#rpc_zmq_matchmaker = redis + +# Use REQ/REP pattern for all methods CALL/CAST/FANOUT. (boolean +# value) +#rpc_zmq_all_req_rep = true + +# Type of concurrency used. Either "native" or "eventlet" (string +# value) +#rpc_zmq_concurrency = eventlet + +# Number of ZeroMQ contexts, defaults to 1. (integer value) +#rpc_zmq_contexts = 1 + +# Maximum number of ingress messages to locally buffer per topic. +# Default is unlimited. (integer value) +#rpc_zmq_topic_backlog = + +# Directory for holding IPC sockets. (string value) +#rpc_zmq_ipc_dir = /var/run/openstack + +# Name of this node. Must be a valid hostname, FQDN, or IP address. +# Must match "host" option, if running Nova. (string value) +#rpc_zmq_host = localhost + +# Seconds to wait before a cast expires (TTL). Only supported by +# impl_zmq. (integer value) +#rpc_cast_timeout = 30 + +# The default number of seconds that poll should wait. Poll raises +# timeout exception when timeout expired. (integer value) +#rpc_poll_timeout = 1 + +# Shows whether zmq-messaging uses broker or not. (boolean value) +#zmq_use_broker = true + +# Minimal port number for random ports range. (integer value) +#rpc_zmq_min_port = 49152 + +# Maximal port number for random ports range. (integer value) +#rpc_zmq_max_port = 65536 + +# Number of retries to find free port number before fail with +# ZMQBindError. (integer value) +#rpc_zmq_bind_port_retries = 100 + +# Host to locate redis. (string value) +#host = 127.0.0.1 + +# Use this port to connect to redis host. (integer value) +#port = 6379 + +# Password for Redis server (optional). (string value) +#password = + +# Size of executor thread pool. (integer value) +# Deprecated group/name - [DEFAULT]/rpc_thread_pool_size +#executor_thread_pool_size = 64 + +# The Drivers(s) to handle sending notifications. Possible values are +# messaging, messagingv2, routing, log, test, noop (multi valued) +#notification_driver = + +# A URL representing the messaging driver to use for notifications. If +# not set, we fall back to the same configuration used for RPC. +# (string value) +#notification_transport_url = + +# AMQP topic used for OpenStack notifications. (list value) +# Deprecated group/name - [rpc_notifier2]/topics +#notification_topics = notifications + +# Seconds to wait for a response from a call. (integer value) +#rpc_response_timeout = 60 + +# A URL representing the messaging driver to use and its full +# configuration. If not set, we fall back to the rpc_backend option +# and driver specific configuration. (string value) +#transport_url = + +# The messaging driver to use, defaults to rabbit. Other drivers +# include qpid and zmq. (string value) +#rpc_backend = rabbit + +# The default exchange under which topics are scoped. May be +# overridden by an exchange name specified in the transport_url +# option. (string value) +#control_exchange = openstack + [api] @@ -407,6 +509,294 @@ #admin_tenant_name = admin +[matchmaker_redis] + +# +# From oslo.messaging +# + +# Host to locate redis. (string value) +#host = 127.0.0.1 + +# Use this port to connect to redis host. (integer value) +#port = 6379 + +# Password for Redis server (optional). (string value) +#password = + + +[oslo_messaging_amqp] + +# +# From oslo.messaging +# + +# address prefix used when sending to a specific server (string value) +# Deprecated group/name - [amqp1]/server_request_prefix +#server_request_prefix = exclusive + +# address prefix used when broadcasting to all servers (string value) +# Deprecated group/name - [amqp1]/broadcast_prefix +#broadcast_prefix = broadcast + +# address prefix when sending to any server in group (string value) +# Deprecated group/name - [amqp1]/group_request_prefix +#group_request_prefix = unicast + +# Name for the AMQP container (string value) +# Deprecated group/name - [amqp1]/container_name +#container_name = + +# Timeout for inactive connections (in seconds) (integer value) +# Deprecated group/name - [amqp1]/idle_timeout +#idle_timeout = 0 + +# Debug: dump AMQP frames to stdout (boolean value) +# Deprecated group/name - [amqp1]/trace +#trace = false + +# CA certificate PEM file to verify server certificate (string value) +# Deprecated group/name - [amqp1]/ssl_ca_file +#ssl_ca_file = + +# Identifying certificate PEM file to present to clients (string +# value) +# Deprecated group/name - [amqp1]/ssl_cert_file +#ssl_cert_file = + +# Private key PEM file used to sign cert_file certificate (string +# value) +# Deprecated group/name - [amqp1]/ssl_key_file +#ssl_key_file = + +# Password for decrypting ssl_key_file (if encrypted) (string value) +# Deprecated group/name - [amqp1]/ssl_key_password +#ssl_key_password = + +# Accept clients using either SSL or plain TCP (boolean value) +# Deprecated group/name - [amqp1]/allow_insecure_clients +#allow_insecure_clients = false + +# Space separated list of acceptable SASL mechanisms (string value) +# Deprecated group/name - [amqp1]/sasl_mechanisms +#sasl_mechanisms = + +# Path to directory that contains the SASL configuration (string +# value) +# Deprecated group/name - [amqp1]/sasl_config_dir +#sasl_config_dir = + +# Name of configuration file (without .conf suffix) (string value) +# Deprecated group/name - [amqp1]/sasl_config_name +#sasl_config_name = + +# User name for message broker authentication (string value) +# Deprecated group/name - [amqp1]/username +#username = + +# Password for message broker authentication (string value) +# Deprecated group/name - [amqp1]/password +#password = + + +[oslo_messaging_qpid] + +# +# From oslo.messaging +# + +# Use durable queues in AMQP. (boolean value) +# Deprecated group/name - [DEFAULT]/amqp_durable_queues +# Deprecated group/name - [DEFAULT]/rabbit_durable_queues +#amqp_durable_queues = false + +# Auto-delete queues in AMQP. (boolean value) +# Deprecated group/name - [DEFAULT]/amqp_auto_delete +#amqp_auto_delete = false + +# Send a single AMQP reply to call message. The current behaviour +# since oslo-incubator is to send two AMQP replies - first one with +# the payload, a second one to ensure the other have finish to send +# the payload. We are going to remove it in the N release, but we must +# keep backward compatible at the same time. This option provides such +# compatibility - it defaults to False in Liberty and can be turned on +# for early adopters with a new installations or for testing. Please +# note, that this option will be removed in the Mitaka release. +# (boolean value) +#send_single_reply = false + +# Qpid broker hostname. (string value) +# Deprecated group/name - [DEFAULT]/qpid_hostname +#qpid_hostname = localhost + +# Qpid broker port. (integer value) +# Deprecated group/name - [DEFAULT]/qpid_port +#qpid_port = 5672 + +# Qpid HA cluster host:port pairs. (list value) +# Deprecated group/name - [DEFAULT]/qpid_hosts +#qpid_hosts = $qpid_hostname:$qpid_port + +# Username for Qpid connection. (string value) +# Deprecated group/name - [DEFAULT]/qpid_username +#qpid_username = + +# Password for Qpid connection. (string value) +# Deprecated group/name - [DEFAULT]/qpid_password +#qpid_password = + +# Space separated list of SASL mechanisms to use for auth. (string +# value) +# Deprecated group/name - [DEFAULT]/qpid_sasl_mechanisms +#qpid_sasl_mechanisms = + +# Seconds between connection keepalive heartbeats. (integer value) +# Deprecated group/name - [DEFAULT]/qpid_heartbeat +#qpid_heartbeat = 60 + +# Transport to use, either 'tcp' or 'ssl'. (string value) +# Deprecated group/name - [DEFAULT]/qpid_protocol +#qpid_protocol = tcp + +# Whether to disable the Nagle algorithm. (boolean value) +# Deprecated group/name - [DEFAULT]/qpid_tcp_nodelay +#qpid_tcp_nodelay = true + +# The number of prefetched messages held by receiver. (integer value) +# Deprecated group/name - [DEFAULT]/qpid_receiver_capacity +#qpid_receiver_capacity = 1 + +# The qpid topology version to use. Version 1 is what was originally +# used by impl_qpid. Version 2 includes some backwards-incompatible +# changes that allow broker federation to work. Users should update +# to version 2 when they are able to take everything down, as it +# requires a clean break. (integer value) +# Deprecated group/name - [DEFAULT]/qpid_topology_version +#qpid_topology_version = 1 + + +[oslo_messaging_rabbit] + +# +# From oslo.messaging +# + +# Use durable queues in AMQP. (boolean value) +# Deprecated group/name - [DEFAULT]/amqp_durable_queues +# Deprecated group/name - [DEFAULT]/rabbit_durable_queues +#amqp_durable_queues = false + +# Auto-delete queues in AMQP. (boolean value) +# Deprecated group/name - [DEFAULT]/amqp_auto_delete +#amqp_auto_delete = false + +# Send a single AMQP reply to call message. The current behaviour +# since oslo-incubator is to send two AMQP replies - first one with +# the payload, a second one to ensure the other have finish to send +# the payload. We are going to remove it in the N release, but we must +# keep backward compatible at the same time. This option provides such +# compatibility - it defaults to False in Liberty and can be turned on +# for early adopters with a new installations or for testing. Please +# note, that this option will be removed in the Mitaka release. +# (boolean value) +#send_single_reply = false + +# SSL version to use (valid only if SSL enabled). Valid values are +# TLSv1 and SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be +# available on some distributions. (string value) +# Deprecated group/name - [DEFAULT]/kombu_ssl_version +#kombu_ssl_version = + +# SSL key file (valid only if SSL enabled). (string value) +# Deprecated group/name - [DEFAULT]/kombu_ssl_keyfile +#kombu_ssl_keyfile = + +# SSL cert file (valid only if SSL enabled). (string value) +# Deprecated group/name - [DEFAULT]/kombu_ssl_certfile +#kombu_ssl_certfile = + +# SSL certification authority file (valid only if SSL enabled). +# (string value) +# Deprecated group/name - [DEFAULT]/kombu_ssl_ca_certs +#kombu_ssl_ca_certs = + +# How long to wait before reconnecting in response to an AMQP consumer +# cancel notification. (floating point value) +# Deprecated group/name - [DEFAULT]/kombu_reconnect_delay +#kombu_reconnect_delay = 1.0 + +# How long to wait before considering a reconnect attempt to have +# failed. This value should not be longer than rpc_response_timeout. +# (integer value) +#kombu_reconnect_timeout = 60 + +# The RabbitMQ broker address where a single node is used. (string +# value) +# Deprecated group/name - [DEFAULT]/rabbit_host +#rabbit_host = localhost + +# The RabbitMQ broker port where a single node is used. (integer +# value) +# Deprecated group/name - [DEFAULT]/rabbit_port +#rabbit_port = 5672 + +# RabbitMQ HA cluster host:port pairs. (list value) +# Deprecated group/name - [DEFAULT]/rabbit_hosts +#rabbit_hosts = $rabbit_host:$rabbit_port + +# Connect over SSL for RabbitMQ. (boolean value) +# Deprecated group/name - [DEFAULT]/rabbit_use_ssl +#rabbit_use_ssl = false + +# The RabbitMQ userid. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_userid +#rabbit_userid = guest + +# The RabbitMQ password. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_password +#rabbit_password = guest + +# The RabbitMQ login method. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_login_method +#rabbit_login_method = AMQPLAIN + +# The RabbitMQ virtual host. (string value) +# Deprecated group/name - [DEFAULT]/rabbit_virtual_host +#rabbit_virtual_host = / + +# How frequently to retry connecting with RabbitMQ. (integer value) +#rabbit_retry_interval = 1 + +# How long to backoff for between retries when connecting to RabbitMQ. +# (integer value) +# Deprecated group/name - [DEFAULT]/rabbit_retry_backoff +#rabbit_retry_backoff = 2 + +# Maximum number of RabbitMQ connection retries. Default is 0 +# (infinite retry count). (integer value) +# Deprecated group/name - [DEFAULT]/rabbit_max_retries +#rabbit_max_retries = 0 + +# Use HA queues in RabbitMQ (x-ha-policy: all). If you change this +# option, you must wipe the RabbitMQ database. (boolean value) +# Deprecated group/name - [DEFAULT]/rabbit_ha_queues +#rabbit_ha_queues = false + +# Number of seconds after which the Rabbit broker is considered down +# if heartbeat's keep-alive fails (0 disable the heartbeat). +# EXPERIMENTAL (integer value) +#heartbeat_timeout_threshold = 60 + +# How often times during the heartbeat_timeout_threshold we check the +# heartbeat. (integer value) +#heartbeat_rate = 2 + +# Deprecated, use rpc_backend=kombu+memory or rpc_backend=fake +# (boolean value) +# Deprecated group/name - [DEFAULT]/fake_rabbit +#fake_rabbit = false + + [watcher_applier] # @@ -458,36 +848,3 @@ # strategy (for example: BASIC_CONSOLIDATION:basic, # MY_GOAL:my_strategy_1) (dict value) #goals = DUMMY:dummy - - -[watcher_messaging] - -# -# From watcher -# - -# The name of the driver used by oslo messaging (string value) -#notifier_driver = messaging - -# The name of a message executor, forexample: eventlet, blocking -# (string value) -#executor = blocking - -# The protocol used by the message broker, for example rabbit (string -# value) -#protocol = rabbit - -# The username used by the message broker (string value) -#user = guest - -# The password of user used by the message broker (string value) -#password = guest - -# The host where the message brokeris installed (string value) -#host = localhost - -# The port used bythe message broker (string value) -#port = 5672 - -# The virtual host used by the message broker (string value) -#virtual_host = diff --git a/tox.ini b/tox.ini index f03e7c9b4..b680e4bc0 100644 --- a/tox.ini +++ b/tox.ini @@ -34,6 +34,7 @@ commands = --namespace keystonemiddleware.auth_token \ --namespace oslo.log \ --namespace oslo.db \ + --namespace oslo.messaging \ --output-file etc/watcher/watcher.conf.sample [flake8] diff --git a/watcher/applier/framework/manager_applier.py b/watcher/applier/framework/manager_applier.py index c335c3491..883732b27 100644 --- a/watcher/applier/framework/manager_applier.py +++ b/watcher/applier/framework/manager_applier.py @@ -69,13 +69,15 @@ CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token', class ApplierManager(MessagingCore): - API_VERSION = '1.0' # todo(jed) need workflow def __init__(self): - MessagingCore.__init__(self, CONF.watcher_applier.publisher_id, - CONF.watcher_applier.topic_control, - CONF.watcher_applier.topic_status) + super(ApplierManager, self).__init__( + CONF.watcher_applier.publisher_id, + CONF.watcher_applier.topic_control, + CONF.watcher_applier.topic_status, + api_version=self.API_VERSION, + ) # shared executor of the workflow self.executor = ThreadPoolExecutor(max_workers=1) self.handler = NotificationHandler(self.publisher_id) diff --git a/watcher/applier/framework/rpcapi.py b/watcher/applier/framework/rpcapi.py index 4368f5faf..89485a1ce 100644 --- a/watcher/applier/framework/rpcapi.py +++ b/watcher/applier/framework/rpcapi.py @@ -24,13 +24,10 @@ import oslo_messaging as om from watcher.applier.framework.manager_applier import APPLIER_MANAGER_OPTS from watcher.applier.framework.manager_applier import opt_group from watcher.common import exception -from watcher.common import utils - - from watcher.common.messaging.messaging_core import MessagingCore from watcher.common.messaging.notification_handler import NotificationHandler -from watcher.common.messaging.utils.transport_url_builder import \ - TransportUrlBuilder +from watcher.common import utils + LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -39,19 +36,23 @@ CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group) class ApplierAPI(MessagingCore): - MessagingCore.API_VERSION = '1.0' def __init__(self): - MessagingCore.__init__(self, CONF.watcher_applier.publisher_id, - CONF.watcher_applier.topic_control, - CONF.watcher_applier.topic_status) + super(ApplierAPI, self).__init__( + CONF.watcher_applier.publisher_id, + CONF.watcher_applier.topic_control, + CONF.watcher_applier.topic_status, + api_version=self.API_VERSION, + ) self.handler = NotificationHandler(self.publisher_id) self.handler.register_observer(self) self.topic_status.add_endpoint(self.handler) - transport = om.get_transport(CONF, TransportUrlBuilder().url) + transport = om.get_transport(CONF) + target = om.Target( topic=CONF.watcher_applier.topic_control, - version=MessagingCore.API_VERSION) + version=self.API_VERSION, + ) self.client = om.RPCClient(transport, target, serializer=self.serializer) diff --git a/watcher/common/messaging/messaging_core.py b/watcher/common/messaging/messaging_core.py index 6e9b6155e..1b5804354 100644 --- a/watcher/common/messaging/messaging_core.py +++ b/watcher/common/messaging/messaging_core.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from oslo_config import cfg from oslo_log import log from watcher.common.messaging.events.event_dispatcher import \ @@ -28,60 +27,33 @@ from watcher.objects.base import WatcherObjectSerializer LOG = log.getLogger(__name__) CONF = cfg.CONF -WATCHER_MESSAGING_OPTS = [ - cfg.StrOpt('notifier_driver', - default='messaging', help='The name of the driver used by' - ' oslo messaging'), - cfg.StrOpt('executor', - default='blocking', help='The name of a message executor, for' - 'example: eventlet, blocking'), - cfg.StrOpt('protocol', - default='rabbit', help='The protocol used by the message' - ' broker, for example rabbit'), - cfg.StrOpt('user', - default='guest', help='The username used by the message ' - 'broker'), - cfg.StrOpt('password', - default='guest', help='The password of user used by the ' - 'message broker'), - cfg.StrOpt('host', - default='localhost', help='The host where the message broker' - 'is installed'), - cfg.StrOpt('port', - default='5672', help='The port used bythe message broker'), - cfg.StrOpt('virtual_host', - default='', help='The virtual host used by the message ' - 'broker') -] - -CONF = cfg.CONF -opt_group = cfg.OptGroup(name='watcher_messaging', - title='Options for the messaging core') -CONF.register_group(opt_group) -CONF.register_opts(WATCHER_MESSAGING_OPTS, opt_group) - class MessagingCore(EventDispatcher): + API_VERSION = '1.0' - def __init__(self, publisher_id, topic_control, topic_status): - EventDispatcher.__init__(self) + def __init__(self, publisher_id, topic_control, topic_status, + api_version=API_VERSION): + super(MessagingCore, self).__init__() self.serializer = RequestContextSerializer(WatcherObjectSerializer()) self.publisher_id = publisher_id + self.api_version = api_version self.topic_control = self.build_topic(topic_control) self.topic_status = self.build_topic(topic_status) def build_topic(self, topic_name): return MessagingHandler(self.publisher_id, topic_name, self, - self.API_VERSION, self.serializer) + self.api_version, self.serializer) def connect(self): - LOG.debug("connecting to rabbitMQ broker") + LOG.debug("Connecting to '%s' (%s)", + CONF.transport_url, CONF.rpc_backend) self.topic_control.start() self.topic_status.start() def disconnect(self): - LOG.debug("Disconnect to rabbitMQ broker") + LOG.debug("Disconnecting from '%s' (%s)", + CONF.transport_url, CONF.rpc_backend) self.topic_control.stop() self.topic_status.stop() @@ -92,12 +64,12 @@ class MessagingCore(EventDispatcher): return self.topic_status.publish_event(event, payload, request_id) def get_version(self): - return self.API_VERSION + return self.api_version def check_api_version(self, context): api_manager_version = self.client.call( context.to_dict(), 'check_api_version', - api_version=self.API_VERSION) + api_version=self.api_version) return api_manager_version def response(self, evt, ctx, message): diff --git a/watcher/common/messaging/messaging_handler.py b/watcher/common/messaging/messaging_handler.py index b283f2933..af2010438 100644 --- a/watcher/common/messaging/messaging_handler.py +++ b/watcher/common/messaging/messaging_handler.py @@ -14,35 +14,42 @@ # See the License for the specific language governing permissions and # limitations under the License. +import socket + import eventlet from oslo_config import cfg from oslo_log import log import oslo_messaging as om from threading import Thread -from watcher.common.messaging.utils.transport_url_builder import \ - TransportUrlBuilder from watcher.common.rpc import JsonPayloadSerializer from watcher.common.rpc import RequestContextSerializer +# NOTE: +# Ubuntu 14.04 forces librabbitmq when kombu is used +# Unfortunately it forces a version that has a crash +# bug. Calling eventlet.monkey_patch() tells kombu +# to use libamqp instead. eventlet.monkey_patch() -LOG = log.getLogger(__name__) +LOG = log.getLogger(__name__) CONF = cfg.CONF class MessagingHandler(Thread): + def __init__(self, publisher_id, topic_watcher, endpoint, version, serializer=None): - Thread.__init__(self) + super(MessagingHandler, self).__init__() + self.publisher_id = publisher_id + self.topic_watcher = topic_watcher + self.__endpoints = [] + self.__serializer = serializer + self.__version = version + self.__server = None self.__notifier = None - self.__endpoints = [] - self.__topics = [] - self._publisher_id = publisher_id - self._topic_watcher = topic_watcher - self.__endpoints.append(endpoint) - self.__version = version - self.__serializer = serializer + self.__transport = None + self.add_endpoint(endpoint) def add_endpoint(self, endpoint): self.__endpoints.append(endpoint) @@ -51,47 +58,50 @@ class MessagingHandler(Thread): if endpoint in self.__endpoints: self.__endpoints.remove(endpoint) + @property + def endpoints(self): + return self.__endpoints + + @property + def transport(self): + return self.__transport + def build_notifier(self): serializer = RequestContextSerializer(JsonPayloadSerializer()) return om.Notifier( - self.transport, - driver=CONF.watcher_messaging.notifier_driver, - publisher_id=self._publisher_id, - topic=self._topic_watcher, - serializer=serializer) + self.__transport, + publisher_id=self.publisher_id, + topic=self.topic_watcher, + serializer=serializer + ) - def build_server(self, targets): - - return om.get_rpc_server(self.transport, targets, + def build_server(self, target): + return om.get_rpc_server(self.__transport, target, self.__endpoints, - executor=CONF. - watcher_messaging.executor, serializer=self.__serializer) - def __build_transport_url(self): - return TransportUrlBuilder().url - - def __config(self): + def _configure(self): try: - self.transport = om.get_transport( - cfg.CONF, - url=self.__build_transport_url()) + self.__transport = om.get_transport(CONF) self.__notifier = self.build_notifier() - if 0 < len(self.__endpoints): - targets = om.Target( - topic=self._topic_watcher, - server=CONF.watcher_messaging.host, - version=self.__version) - self.__server = self.build_server(targets) + if len(self.__endpoints): + target = om.Target( + topic=self.topic_watcher, + # For compatibility, we can override it with 'host' opt + server=CONF.host or socket.getfqdn(), + version=self.__version, + ) + self.__server = self.build_server(target) else: - LOG.warn("you have no defined endpoint, \ - so you can only publish events") + LOG.warn("you have no defined endpoint, " + "so you can only publish events") except Exception as e: + LOG.exception(e) LOG.error("configure : %s" % str(e.message)) def run(self): - LOG.debug("configure MessagingHandler for %s" % self._topic_watcher) - self.__config() + LOG.debug("configure MessagingHandler for %s" % self.topic_watcher) + self._configure() if len(self.__endpoints) > 0: LOG.debug("Starting up server") self.__server.start() @@ -102,6 +112,8 @@ class MessagingHandler(Thread): self.__server.stop() def publish_event(self, event_type, payload, request_id=None): - self.__notifier.info({'version_api': self.__version, - 'request_id': request_id}, - {'event_id': event_type}, payload) + self.__notifier.info( + {'version_api': self.__version, + 'request_id': request_id}, + {'event_id': event_type}, payload + ) diff --git a/watcher/common/messaging/utils/transport_url_builder.py b/watcher/common/messaging/utils/transport_url_builder.py deleted file mode 100644 index 62791a2af..000000000 --- a/watcher/common/messaging/utils/transport_url_builder.py +++ /dev/null @@ -1,35 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_config import cfg -from oslo_log import log - -LOG = log.getLogger(__name__) -CONF = cfg.CONF - - -class TransportUrlBuilder(object): - - @property - def url(self): - return "%s://%s:%s@%s:%s/%s" % ( - CONF.watcher_messaging.protocol, - CONF.watcher_messaging.user, - CONF.watcher_messaging.password, - CONF.watcher_messaging.host, - CONF.watcher_messaging.port, - CONF.watcher_messaging.virtual_host - ) diff --git a/watcher/common/rpc_service.py b/watcher/common/rpc_service.py deleted file mode 100644 index 864c9d1d3..000000000 --- a/watcher/common/rpc_service.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2014 - Rackspace Hosting -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Common RPC service and API tools for Watcher.""" - -import eventlet -from oslo_config import cfg -import oslo_messaging as messaging - -from watcher.common import context as watcher_context -from watcher.common import rpc -from watcher.objects import base as objects_base - - -# NOTE(paulczar): -# Ubuntu 14.04 forces librabbitmq when kombu is used -# Unfortunately it forces a version that has a crash -# bug. Calling eventlet.monkey_patch() tells kombu -# to use libamqp instead. -eventlet.monkey_patch() - -# NOTE(asalkeld): -# The watcher.openstack.common.rpc entries are for compatability -# with devstack rpc_backend configuration values. -TRANSPORT_ALIASES = { - 'watcher.openstack.common.rpc.impl_kombu': 'rabbit', - 'watcher.openstack.common.rpc.impl_qpid': 'qpid', - 'watcher.openstack.common.rpc.impl_zmq': 'zmq', -} - - -class RequestContextSerializer(messaging.Serializer): - - def __init__(self, base): - self._base = base - - def serialize_entity(self, context, entity): - if not self._base: - return entity - return self._base.serialize_entity(context, entity) - - def deserialize_entity(self, context, entity): - if not self._base: - return entity - return self._base.deserialize_entity(context, entity) - - def serialize_context(self, context): - return context.to_dict() - - def deserialize_context(self, context): - return watcher_context.RequestContext.from_dict(context) - - -class Service(object): - _server = None - - def __init__(self, topic, server, handlers): - serializer = RequestContextSerializer( - objects_base.WatcherObjectSerializer()) - transport = messaging.get_transport(cfg.CONF, - aliases=TRANSPORT_ALIASES) - # TODO(asalkeld) add support for version='x.y' - target = messaging.Target(topic=topic, server=server) - self._server = messaging.get_rpc_server(transport, target, handlers, - serializer=serializer) - - def serve(self): - self._server.start() - self._server.wait() - - -class API(object): - def __init__(self, transport=None, context=None, topic=None): - serializer = RequestContextSerializer( - objects_base.WatcherObjectSerializer()) - if transport is None: - exmods = rpc.get_allowed_exmods() - transport = messaging.get_transport(cfg.CONF, - allowed_remote_exmods=exmods, - aliases=TRANSPORT_ALIASES) - self._context = context - if topic is None: - topic = '' - target = messaging.Target(topic=topic) - self._client = messaging.RPCClient(transport, target, - serializer=serializer) - - def _call(self, method, *args, **kwargs): - # import pdb; pdb.set_trace() - return self._client.call(self._context, method, *args, **kwargs) - - def _cast(self, method, *args, **kwargs): - self._client.cast(self._context, method, *args, **kwargs) - - def echo(self, message): - self._cast('echo', message=message) diff --git a/watcher/decision_engine/manager.py b/watcher/decision_engine/manager.py index 83d30e26c..a16782c04 100644 --- a/watcher/decision_engine/manager.py +++ b/watcher/decision_engine/manager.py @@ -57,12 +57,14 @@ CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group) class DecisionEngineManager(MessagingCore): - API_VERSION = '1.0' def __init__(self): - MessagingCore.__init__(self, CONF.watcher_decision_engine.publisher_id, - CONF.watcher_decision_engine.topic_control, - CONF.watcher_decision_engine.topic_status) + super(DecisionEngineManager, self).__init__( + CONF.watcher_decision_engine.publisher_id, + CONF.watcher_decision_engine.topic_control, + CONF.watcher_decision_engine.topic_status, + api_version=self.API_VERSION, + ) self.handler = NotificationHandler(self.publisher_id) self.handler.register_observer(self) self.add_event_listener(Events.ALL, self.event_receive) diff --git a/watcher/decision_engine/rpcapi.py b/watcher/decision_engine/rpcapi.py index 889aa1959..c51e96304 100644 --- a/watcher/decision_engine/rpcapi.py +++ b/watcher/decision_engine/rpcapi.py @@ -17,19 +17,14 @@ # limitations under the License. # - from oslo_config import cfg from oslo_log import log import oslo_messaging as om from watcher.common import exception -from watcher.common import utils - - from watcher.common.messaging.messaging_core import MessagingCore from watcher.common.messaging.notification_handler import NotificationHandler -from watcher.common.messaging.utils.transport_url_builder import \ - TransportUrlBuilder +from watcher.common import utils from watcher.decision_engine.event.consumer_factory import EventConsumerFactory from watcher.decision_engine.manager import decision_engine_opt_group from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS @@ -44,23 +39,24 @@ CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group) class DecisionEngineAPI(MessagingCore): - # This must be in sync with manager.DecisionEngineManager's. - MessagingCore.API_VERSION = '1.0' def __init__(self): - MessagingCore.__init__(self, CONF.watcher_decision_engine.publisher_id, - CONF.watcher_decision_engine.topic_control, - CONF.watcher_decision_engine.topic_status) + super(DecisionEngineAPI, self).__init__( + CONF.watcher_decision_engine.publisher_id, + CONF.watcher_decision_engine.topic_control, + CONF.watcher_decision_engine.topic_status, + api_version=self.API_VERSION, + ) self.handler = NotificationHandler(self.publisher_id) self.handler.register_observer(self) self.add_event_listener(Events.ALL, self.event_receive) self.topic_status.add_endpoint(self.handler) - transport = om.get_transport(CONF, TransportUrlBuilder().url) + transport = om.get_transport(CONF) target = om.Target( - exchange='watcher', topic=CONF.watcher_decision_engine.topic_control, - version=MessagingCore.API_VERSION) + version=self.API_VERSION, + ) self.client = om.RPCClient(transport, target, serializer=self.serializer) diff --git a/watcher/opts.py b/watcher/opts.py index 7a23f1df1..57d135017 100644 --- a/watcher/opts.py +++ b/watcher/opts.py @@ -17,8 +17,6 @@ import watcher.api.app from watcher.applier.framework import manager_applier -import watcher.common.messaging.messaging_core - from watcher.decision_engine import manager from watcher.decision_engine.strategy.selector import default \ as strategy_selector @@ -27,8 +25,6 @@ from watcher.decision_engine.strategy.selector import default \ def list_opts(): return [ ('api', watcher.api.app.API_SERVICE_OPTS), - ('watcher_messaging', - watcher.common.messaging.messaging_core.WATCHER_MESSAGING_OPTS), ('watcher_goals', strategy_selector.WATCHER_GOALS_OPTS), ('watcher_decision_engine', manager.WATCHER_DECISION_ENGINE_OPTS), diff --git a/watcher/tests/common/messaging/test_messaging_core.py b/watcher/tests/common/messaging/test_messaging_core.py index d233bb4df..cde501e76 100644 --- a/watcher/tests/common/messaging/test_messaging_core.py +++ b/watcher/tests/common/messaging/test_messaging_core.py @@ -15,63 +15,74 @@ # limitations under the License. -import mock -from oslo_config import cfg +from mock import patch from watcher.common.messaging.messaging_core import MessagingCore from watcher.common.messaging.messaging_handler import MessagingHandler from watcher.common.rpc import RequestContextSerializer -from watcher.tests import base - -CONF = cfg.CONF +from watcher.tests.base import TestCase -class TestMessagingCore(base.TestCase): - messaging = MessagingCore("", "", "") +class TestMessagingCore(TestCase): - def fake_topic_name(self): - topic_name = "MyTopic" - return topic_name + def setUp(self): + super(TestMessagingCore, self).setUp() def test_build_topic(self): - topic_name = self.fake_topic_name() - messaging_handler = self.messaging.build_topic(topic_name) + topic_name = "MyTopic" + messaging = MessagingCore("", "", "") + messaging_handler = messaging.build_topic(topic_name) self.assertIsNotNone(messaging_handler) def test_init_messaging_core(self): - self.assertIsInstance(self.messaging.serializer, + messaging = MessagingCore("", "", "") + self.assertIsInstance(messaging.serializer, RequestContextSerializer) - self.assertIsInstance(self.messaging.topic_control, MessagingHandler) - self.assertIsInstance(self.messaging.topic_status, MessagingHandler) + self.assertIsInstance(messaging.topic_control, MessagingHandler) + self.assertIsInstance(messaging.topic_status, MessagingHandler) - def test_publish_control(self): - with mock.patch.object(MessagingCore, 'publish_control') as mock_call: - payload = { - "name": "value", - } - event = "MyEvent" - self.messaging.publish_control(event, payload) - mock_call.assert_called_once_with(event, payload) + @patch.object(MessagingCore, 'publish_control') + def test_publish_control(self, mock_call): + payload = { + "name": "value", + } + event = "MyEvent" + messaging = MessagingCore("", "", "") + messaging.publish_control(event, payload) + mock_call.assert_called_once_with(event, payload) - def test_publish_status(self): - with mock.patch.object(MessagingCore, 'publish_status') as mock_call: - payload = { - "name": "value", - } - event = "MyEvent" - self.messaging.publish_status(event, payload) - mock_call.assert_called_once_with(event, payload) + @patch.object(MessagingCore, 'publish_status') + def test_publish_status(self, mock_call): + payload = { + "name": "value", + } + event = "MyEvent" + messaging = MessagingCore("", "", "") + messaging.publish_status(event, payload) + mock_call.assert_called_once_with(event, payload) - def test_response(self): - with mock.patch.object(MessagingCore, 'publish_status') as mock_call: - event = "My event" - context = {'request_id': 12} - message = "My Message" + @patch.object(MessagingCore, 'publish_status') + def test_response(self, mock_call): + event = "My event" + context = {'request_id': 12} + message = "My Message" - self.messaging.response(event, context, message) + messaging = MessagingCore("", "", "") + messaging.response(event, context, message) - expected_payload = { - 'request_id': context['request_id'], - 'msg': message - } - mock_call.assert_called_once_with(event, expected_payload) + expected_payload = { + 'request_id': context['request_id'], + 'msg': message + } + mock_call.assert_called_once_with(event, expected_payload) + + def test_messaging_build_topic(self): + messaging = MessagingCore("pub_id", "test_topic", "does not matter") + topic = messaging.build_topic("test_topic") + + self.assertIsInstance(topic, MessagingHandler) + self.assertEqual(messaging.publisher_id, "pub_id") + self.assertEqual(topic.publisher_id, "pub_id") + + self.assertEqual(messaging.topic_control.topic_watcher, "test_topic") + self.assertEqual(topic.topic_watcher, "test_topic") diff --git a/watcher/tests/common/messaging/test_messaging_handler.py b/watcher/tests/common/messaging/test_messaging_handler.py new file mode 100644 index 000000000..8afb29d7a --- /dev/null +++ b/watcher/tests/common/messaging/test_messaging_handler.py @@ -0,0 +1,78 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock +from mock import patch +from oslo_config import cfg +import oslo_messaging as messaging +from watcher.common.messaging.messaging_handler import MessagingHandler +from watcher.tests.base import TestCase + +CONF = cfg.CONF + + +class TestMessagingHandler(TestCase): + + PUBLISHER_ID = 'TEST_API' + TOPIC_WATCHER = 'TEST_TOPIC_WATCHER' + ENDPOINT = 'http://fake-fqdn:1337' + VERSION = "1.0" + + def setUp(self): + super(TestMessagingHandler, self).setUp() + CONF.set_default('host', 'fake-fqdn') + + @patch.object(messaging, "get_rpc_server") + @patch.object(messaging, "Target") + def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server): + m_target = Mock() + m_target_cls.return_value = m_target + messaging_handler = MessagingHandler( + publisher_id=self.PUBLISHER_ID, + topic_watcher=self.TOPIC_WATCHER, + endpoint=self.ENDPOINT, + version=self.VERSION, + serializer=None, + ) + + messaging_handler.run() + + m_target_cls.assert_called_once_with( + server="fake-fqdn", + topic="TEST_TOPIC_WATCHER", + version="1.0", + ) + m_get_rpc_server.assert_called_once_with( + messaging_handler.transport, + m_target, + [self.ENDPOINT], + serializer=None, + ) + + def test_messaging_handler_remove_endpoint(self): + messaging_handler = MessagingHandler( + publisher_id=self.PUBLISHER_ID, + topic_watcher=self.TOPIC_WATCHER, + endpoint=self.ENDPOINT, + version=self.VERSION, + serializer=None, + ) + + self.assertEqual(messaging_handler.endpoints, [self.ENDPOINT]) + + messaging_handler.remove_endpoint(self.ENDPOINT) + + self.assertEqual(messaging_handler.endpoints, []) diff --git a/watcher/tests/common/messaging/utils/__init__.py b/watcher/tests/common/messaging/utils/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/tests/common/messaging/utils/test_transport_url_builder.py b/watcher/tests/common/messaging/utils/test_transport_url_builder.py deleted file mode 100644 index 49d17c667..000000000 --- a/watcher/tests/common/messaging/utils/test_transport_url_builder.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from oslo_config import cfg -import re -from watcher.common.messaging.utils.transport_url_builder import \ - TransportUrlBuilder -from watcher.tests import base - -CONF = cfg.CONF - - -class TestTransportUrlBuilder(base.TestCase): - - def setUp(self): - super(TestTransportUrlBuilder, self).setUp() - - def test_transport_url_not_none(self): - url = TransportUrlBuilder().url - self.assertIsNotNone(url, "The transport url must not be none") - - def test_transport_url_valid_pattern(self): - url = TransportUrlBuilder().url - url_pattern = r'(\D+)://(\D+):(\D+)@(\D+):(\d+)' - pattern = re.compile(url_pattern) - match = re.search(url_pattern, url) - self.assertEqual('rabbit', match.group(1)) - self.assertEqual('guest', match.group(2)) - self.assertEqual('guest', match.group(3)) - self.assertEqual('localhost', match.group(4)) - self.assertEqual('5672', match.group(5)) - self.assertIsNotNone(pattern.match(url)) diff --git a/watcher/tests/conf_fixture.py b/watcher/tests/conf_fixture.py index da50a6dac..d33006124 100644 --- a/watcher/tests/conf_fixture.py +++ b/watcher/tests/conf_fixture.py @@ -34,7 +34,6 @@ class ConfFixture(fixtures.Fixture): def setUp(self): super(ConfFixture, self).setUp() - self.conf.set_default('host', 'fake-mini') self.conf.set_default('connection', "sqlite://", group='database') self.conf.set_default('sqlite_synchronous', False, group='database') self.conf.set_default('verbose', True) diff --git a/watcher/tests/decision_engine/test_rpcapi.py b/watcher/tests/decision_engine/test_rpcapi.py index f7250543b..048dd45ad 100644 --- a/watcher/tests/decision_engine/test_rpcapi.py +++ b/watcher/tests/decision_engine/test_rpcapi.py @@ -40,7 +40,7 @@ class TestDecisionEngineAPI(base.TestCase): mock_call.assert_called_once_with( expected_context.to_dict(), 'check_api_version', - api_version=DecisionEngineAPI().API_VERSION) + api_version=DecisionEngineAPI().api_version) def test_execute_audit_throw_exception(self): audit_uuid = "uuid"