Sync CDM among Decision Engines by using notification pool

This commit allows to consume notifications via notifications pools[1].
Listeners in notification pools recieves a copy of notification. It
will let Watcher to sync Data Models of Decision Engines.

[1]: https://docs.openstack.org/oslo.messaging/ocata/notification_listener.html

Change-Id: Ie37528263181924f84510500fc1277b0237c1df8
Partially-Implements: blueprint support-watcher-ha-active-active-mode
This commit is contained in:
Alexander Chadin
2018-03-12 14:07:50 +03:00
parent cfaab0cbdc
commit d62c4967bd
3 changed files with 16 additions and 6 deletions

View File

@@ -73,7 +73,7 @@
live_migration_uri: 'qemu+ssh://root@%s/system'
devstack_services:
watcher-api: false
watcher-decision-engine: false
watcher-decision-engine: true
watcher-applier: false
# We need to add TLS support for watcher plugin
tls-proxy: false

View File

@@ -251,11 +251,18 @@ class Service(service.ServiceBase):
def build_notification_handler(self, topic_names, endpoints=()):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
targets = [om.Target(topic=topic_name) for topic_name in topic_names]
targets = []
for topic in topic_names:
kwargs = {}
if '.' in topic:
exchange, topic = topic.split('.')
kwargs['exchange'] = exchange
kwargs['topic'] = topic
targets.append(om.Target(**kwargs))
return om.get_notification_listener(
self.notification_transport, targets, endpoints,
executor='eventlet', serializer=serializer,
allow_requeue=False)
allow_requeue=False, pool=CONF.host)
def start(self):
LOG.debug("Connecting to '%s' (%s)",

View File

@@ -30,9 +30,12 @@ WATCHER_DECISION_ENGINE_OPTS = [
'control events, this topic '
'used for RPC calls'),
cfg.ListOpt('notification_topics',
default=['versioned_notifications', 'watcher_notifications'],
help='The topic names from which notification events '
'will be listened to'),
default=['nova.versioned_notifications',
'watcher.watcher_notifications'],
help='The exchange and topic names from which '
'notification events will be listened to. '
'The exchange should be specified to get '
'an ability to use pools.'),
cfg.StrOpt('publisher_id',
default='watcher.decision.api',
help='The identifier used by the Watcher '