From d62c4967bd5d5da60c247b810ae7f8651ff2effc Mon Sep 17 00:00:00 2001 From: Alexander Chadin Date: Mon, 12 Mar 2018 14:07:50 +0300 Subject: [PATCH] Sync CDM among Decision Engines by using notification pool This commit allows to consume notifications via notifications pools[1]. Listeners in notification pools recieves a copy of notification. It will let Watcher to sync Data Models of Decision Engines. [1]: https://docs.openstack.org/oslo.messaging/ocata/notification_listener.html Change-Id: Ie37528263181924f84510500fc1277b0237c1df8 Partially-Implements: blueprint support-watcher-ha-active-active-mode --- .zuul.yaml | 2 +- watcher/common/service.py | 11 +++++++++-- watcher/conf/decision_engine.py | 9 ++++++--- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/.zuul.yaml b/.zuul.yaml index 6dd11c2f7..9c6c83227 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -73,7 +73,7 @@ live_migration_uri: 'qemu+ssh://root@%s/system' devstack_services: watcher-api: false - watcher-decision-engine: false + watcher-decision-engine: true watcher-applier: false # We need to add TLS support for watcher plugin tls-proxy: false diff --git a/watcher/common/service.py b/watcher/common/service.py index f2778c639..ea269fbef 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -251,11 +251,18 @@ class Service(service.ServiceBase): def build_notification_handler(self, topic_names, endpoints=()): serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) - targets = [om.Target(topic=topic_name) for topic_name in topic_names] + targets = [] + for topic in topic_names: + kwargs = {} + if '.' in topic: + exchange, topic = topic.split('.') + kwargs['exchange'] = exchange + kwargs['topic'] = topic + targets.append(om.Target(**kwargs)) return om.get_notification_listener( self.notification_transport, targets, endpoints, executor='eventlet', serializer=serializer, - allow_requeue=False) + allow_requeue=False, pool=CONF.host) def start(self): LOG.debug("Connecting to '%s' (%s)", diff --git a/watcher/conf/decision_engine.py b/watcher/conf/decision_engine.py index f2ceb3a38..811cba9f9 100644 --- a/watcher/conf/decision_engine.py +++ b/watcher/conf/decision_engine.py @@ -30,9 +30,12 @@ WATCHER_DECISION_ENGINE_OPTS = [ 'control events, this topic ' 'used for RPC calls'), cfg.ListOpt('notification_topics', - default=['versioned_notifications', 'watcher_notifications'], - help='The topic names from which notification events ' - 'will be listened to'), + default=['nova.versioned_notifications', + 'watcher.watcher_notifications'], + help='The exchange and topic names from which ' + 'notification events will be listened to. ' + 'The exchange should be specified to get ' + 'an ability to use pools.'), cfg.StrOpt('publisher_id', default='watcher.decision.api', help='The identifier used by the Watcher '