Merge "Sync CDM among Decision Engines by using notification pool"

This commit is contained in:
Zuul
2018-07-26 07:44:52 +00:00
committed by Gerrit Code Review
3 changed files with 16 additions and 6 deletions

View File

@@ -59,7 +59,7 @@
live_migration_uri: 'qemu+ssh://root@%s/system' live_migration_uri: 'qemu+ssh://root@%s/system'
devstack_services: devstack_services:
watcher-api: false watcher-api: false
watcher-decision-engine: false watcher-decision-engine: true
watcher-applier: false watcher-applier: false
# We need to add TLS support for watcher plugin # We need to add TLS support for watcher plugin
tls-proxy: false tls-proxy: false

View File

@@ -251,11 +251,18 @@ class Service(service.ServiceBase):
def build_notification_handler(self, topic_names, endpoints=()): def build_notification_handler(self, topic_names, endpoints=()):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
targets = [om.Target(topic=topic_name) for topic_name in topic_names] targets = []
for topic in topic_names:
kwargs = {}
if '.' in topic:
exchange, topic = topic.split('.')
kwargs['exchange'] = exchange
kwargs['topic'] = topic
targets.append(om.Target(**kwargs))
return om.get_notification_listener( return om.get_notification_listener(
self.notification_transport, targets, endpoints, self.notification_transport, targets, endpoints,
executor='eventlet', serializer=serializer, executor='eventlet', serializer=serializer,
allow_requeue=False) allow_requeue=False, pool=CONF.host)
def start(self): def start(self):
LOG.debug("Connecting to '%s' (%s)", LOG.debug("Connecting to '%s' (%s)",

View File

@@ -30,9 +30,12 @@ WATCHER_DECISION_ENGINE_OPTS = [
'control events, this topic ' 'control events, this topic '
'used for RPC calls'), 'used for RPC calls'),
cfg.ListOpt('notification_topics', cfg.ListOpt('notification_topics',
default=['versioned_notifications', 'watcher_notifications'], default=['nova.versioned_notifications',
help='The topic names from which notification events ' 'watcher.watcher_notifications'],
'will be listened to'), help='The exchange and topic names from which '
'notification events will be listened to. '
'The exchange should be specified to get '
'an ability to use pools.'),
cfg.StrOpt('publisher_id', cfg.StrOpt('publisher_id',
default='watcher.decision.api', default='watcher.decision.api',
help='The identifier used by the Watcher ' help='The identifier used by the Watcher '