Merge "Notification and CDM partial update"

This commit is contained in:
Jenkins
2016-08-26 17:01:22 +00:00
committed by Gerrit Code Review
60 changed files with 2154 additions and 240 deletions

View File

@@ -78,8 +78,7 @@ class AuditHandler(BaseAuditHandler):
event.data = {}
payload = {'audit_uuid': audit_uuid,
'audit_status': status}
self.messaging.status_topic_handler.publish_event(
event.type.name, payload)
self.messaging.publish_status_event(event.type.name, payload)
def update_audit_state(self, request_context, audit, state):
LOG.debug("Update audit state: %s", state)

View File

@@ -15,7 +15,6 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This component is responsible for computing a set of potential optimization
@@ -40,6 +39,7 @@ See :doc:`../architecture` for more details on this component.
from oslo_config import cfg
from watcher.decision_engine.messaging import audit_endpoint
from watcher.decision_engine.model.collector import manager
CONF = cfg.CONF
@@ -57,6 +57,10 @@ WATCHER_DECISION_ENGINE_OPTS = [
'is used so as to notify'
'the others components '
'of the system'),
cfg.ListOpt('notification_topics',
default=['versioned_notifications', 'watcher_notifications'],
help='The topic names from which notification events '
'will be listened to'),
cfg.StrOpt('publisher_id',
default='watcher.decision.api',
help='The identifier used by the Watcher '
@@ -65,8 +69,7 @@ WATCHER_DECISION_ENGINE_OPTS = [
default=2,
required=True,
help='The maximum number of threads that can be used to '
'execute strategies',
),
'execute strategies'),
]
decision_engine_opt_group = cfg.OptGroup(name='watcher_decision_engine',
title='Defines the parameters of '
@@ -79,11 +82,19 @@ class DecisionEngineManager(object):
API_VERSION = '1.0'
conductor_endpoints = [audit_endpoint.AuditEndpoint]
status_endpoints = []
def __init__(self):
self.api_version = self.API_VERSION
self.publisher_id = CONF.watcher_decision_engine.publisher_id
self.conductor_topic = CONF.watcher_decision_engine.conductor_topic
self.status_topic = CONF.watcher_decision_engine.status_topic
self.api_version = self.API_VERSION
self.notification_topics = (
CONF.watcher_decision_engine.notification_topics)
self.conductor_endpoints = [audit_endpoint.AuditEndpoint]
self.status_endpoints = []
self.collector_manager = manager.CollectorManager()
self.notification_endpoints = (
self.collector_manager.get_notification_endpoints())

View File

@@ -30,6 +30,7 @@ LOG = log.getLogger(__name__)
class AuditEndpoint(object):
def __init__(self, messaging):
self._messaging = messaging
self._executor = futures.ThreadPoolExecutor(

View File

@@ -139,6 +139,15 @@ class BaseClusterDataModelCollector(loadable.LoadableSingleton):
self._cluster_data_model = model
self.lock.release()
@abc.abstractproperty
def notification_endpoints(self):
"""Associated notification endpoints
:return: Associated notification endpoints
:rtype: List of :py:class:`~.EventsNotificationEndpoint` instances
"""
raise NotImplementedError()
def set_cluster_data_model_as_stale(self):
self.cluster_data_model = self.STALE_MODEL

View File

@@ -31,6 +31,7 @@ class CollectorManager(object):
def __init__(self):
self.collector_loader = default.ClusterDataModelCollectorLoader()
self._collectors = None
self._notification_endpoints = None
def get_collectors(self):
if self._collectors is None:
@@ -43,6 +44,15 @@ class CollectorManager(object):
return self._collectors
def get_notification_endpoints(self):
if self._notification_endpoints is None:
endpoints = []
for collector in self.get_collectors().values():
endpoints.extend(collector.notification_endpoints)
self._notification_endpoints = endpoints
return self._notification_endpoints
def get_cluster_model_collector(self, name, osc=None):
"""Retrieve cluster data model collector

View File

@@ -22,6 +22,7 @@ from watcher.common import nova_helper
from watcher.decision_engine.model.collector import base
from watcher.decision_engine.model import element
from watcher.decision_engine.model import model_root
from watcher.decision_engine.model.notification import nova
LOG = log.getLogger(__name__)
@@ -43,6 +44,26 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
super(NovaClusterDataModelCollector, self).__init__(config, osc)
self.wrapper = nova_helper.NovaHelper(osc=self.osc)
@property
def notification_endpoints(self):
"""Associated notification endpoints
:return: Associated notification endpoints
:rtype: List of :py:class:`~.EventsNotificationEndpoint` instances
"""
return [
nova.ServiceUpdated(self),
nova.InstanceCreated(self),
nova.InstanceUpdated(self),
nova.InstanceDeletedEnd(self),
nova.LegacyInstanceCreatedEnd(self),
nova.LegacyInstanceUpdated(self),
nova.LegacyInstanceDeletedEnd(self),
nova.LegacyLiveMigratedEnd(self),
]
def execute(self):
"""Build the compute cluster data model"""
LOG.debug("Building latest Nova cluster data model")
@@ -87,7 +108,6 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
disk.set_capacity(instance, v.flavor['disk'])
num_cores.set_capacity(instance, v.flavor['vcpus'])
model.get_mapping().map(node, instance)
model.add_instance(instance)
model.map_instance(instance, node)
return model

View File

@@ -22,7 +22,6 @@ from watcher.decision_engine.model.element import node
from watcher.decision_engine.model.element import resource
ServiceState = node.ServiceState
PowerState = node.PowerState
ComputeNode = node.ComputeNode
InstanceState = instance.InstanceState
@@ -35,5 +34,5 @@ Resource = resource.Resource
__all__ = [
'ServiceState', 'PowerState', 'ComputeNode', 'InstanceState', 'Instance',
'ServiceState', 'ComputeNode', 'InstanceState', 'Instance',
'DiskInfo', 'ResourceType', 'Resource']

View File

@@ -26,31 +26,12 @@ class ServiceState(enum.Enum):
DISABLED = 'disabled'
class PowerState(enum.Enum):
# away mode
g0 = "g0"
# power on suspend (processor caches are flushed)
# The power to the CPU(s) and RAM is maintained
g1_S1 = "g1_S1"
# CPU powered off. Dirty cache is flushed to RAM
g1_S2 = "g1_S2"
# Suspend to RAM
g1_S3 = "g1_S3"
# Suspend to Disk
g1_S4 = "g1_S4"
# switch outlet X OFF on the PDU (Power Distribution Unit)
switch_off = "switch_off"
# switch outlet X ON on the PDU (Power Distribution Unit)
switch_on = "switch_on"
class ComputeNode(compute_resource.ComputeResource):
def __init__(self):
super(ComputeNode, self).__init__()
self._state = ServiceState.ONLINE
self._status = ServiceState.ENABLED
self._power_state = PowerState.g0
self._state = ServiceState.ONLINE.value
self._status = ServiceState.ENABLED.value
def accept(self, visitor):
raise NotImplementedError()
@@ -70,11 +51,3 @@ class ComputeNode(compute_resource.ComputeResource):
@status.setter
def status(self, s):
self._status = s
@property
def powerstate(self):
return self._power_state
@powerstate.setter
def powerstate(self, p):
self._power_state = p

View File

@@ -47,6 +47,9 @@ class Resource(object):
def set_capacity(self, element, value):
self.mapping[element.uuid] = value
def unset_capacity(self, element):
del self.mapping[element.uuid]
def get_capacity_from_id(self, uuid):
if str(uuid) in self.mapping.keys():
return self.mapping[str(uuid)]

View File

@@ -36,17 +36,15 @@ class Mapping(object):
:param node: the node
:param instance: the virtual machine or instance
"""
try:
self.lock.acquire()
# init first
if node.uuid not in self.compute_node_mapping.keys():
self.compute_node_mapping[node.uuid] = []
self.compute_node_mapping[node.uuid] = set()
# map node => instances
self.compute_node_mapping[node.uuid].append(
instance.uuid)
self.compute_node_mapping[node.uuid].add(instance.uuid)
# map instance => node
self.instance_mapping[instance.uuid] = node.uuid
@@ -60,7 +58,6 @@ class Mapping(object):
:param node: the node
:param instance: the virtual machine or instance
"""
self.unmap_from_id(node.uuid, instance.uuid)
def unmap_from_id(self, node_uuid, instance_uuid):
@@ -68,7 +65,6 @@ class Mapping(object):
:rtype : object
"""
try:
self.lock.acquire()
if str(node_uuid) in self.compute_node_mapping:
@@ -77,9 +73,9 @@ class Mapping(object):
# remove instance
self.instance_mapping.pop(instance_uuid)
else:
LOG.warning(_LW(
"Trying to delete the instance %(instance)s but it was "
"not found on node %(node)s"),
LOG.warning(
_LW("Trying to delete the instance %(instance)s but it "
"was not found on node %(node)s") %
{'instance': instance_uuid, 'node': node_uuid})
finally:
self.lock.release()
@@ -96,7 +92,6 @@ class Mapping(object):
:param instance: the uuid of the instance
:return: node
"""
return self.model.get_node_from_id(
self.instance_mapping[str(instance_uuid)])
@@ -113,21 +108,4 @@ class Mapping(object):
return self.compute_node_mapping[str(node_uuid)]
else:
# empty
return []
def migrate_instance(self, instance, source_node, destination_node):
"""Migrate single instance from source_node to destination_node
:param instance:
:param source_node:
:param destination_node:
:return:
"""
if source_node == destination_node:
return False
# unmap
self.unmap(source_node, instance)
# map
self.map(destination_node, instance)
return True
return set()

View File

@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from watcher._i18n import _
from watcher.common import exception
from watcher.common import utils
@@ -51,7 +53,7 @@ class ModelRoot(object):
def remove_node(self, node):
self.assert_node(node)
if str(node.uuid) not in self._nodes:
raise exception.ComputeNodeNotFound(node.uuid)
raise exception.ComputeNodeNotFound(name=node.uuid)
else:
del self._nodes[node.uuid]
@@ -59,12 +61,75 @@ class ModelRoot(object):
self.assert_instance(instance)
self._instances[instance.uuid] = instance
def remove_instance(self, instance):
self.assert_instance(instance)
del self._instances[instance.uuid]
def map_instance(self, instance, node):
"""Map a newly created instance to a node
:param instance: :py:class:`~.Instance` object or instance UUID
:type instance: str or :py:class:`~.Instance`
:param node: :py:class:`~.ComputeNode` object or node UUID
:type node: str or :py:class:`~.Instance`
"""
if isinstance(instance, six.string_types):
instance = self.get_instance_from_id(instance)
if isinstance(node, six.string_types):
node = self.get_node_from_id(node)
self.add_instance(instance)
self.mapping.map(node, instance)
def unmap_instance(self, instance, node):
"""Unmap an instance from a node
:param instance: :py:class:`~.Instance` object or instance UUID
:type instance: str or :py:class:`~.Instance`
:param node: :py:class:`~.ComputeNode` object or node UUID
:type node: str or :py:class:`~.Instance`
"""
if isinstance(instance, six.string_types):
instance = self.get_instance_from_id(instance)
if isinstance(node, six.string_types):
node = self.get_node_from_id(node)
self.add_instance(instance)
self.mapping.unmap(node, instance)
def delete_instance(self, instance, node):
self.remove_instance(instance)
self.mapping.unmap(node, instance)
for resource in self.resource.values():
try:
resource.unset_capacity(instance)
except KeyError:
pass
def migrate_instance(self, instance, source_node, destination_node):
"""Migrate single instance from source_node to destination_node
:param instance:
:param source_node:
:param destination_node:
:return:
"""
if source_node == destination_node:
return False
# unmap
self.mapping.unmap(source_node, instance)
# map
self.mapping.map(destination_node, instance)
return True
def get_all_compute_nodes(self):
return self._nodes
def get_node_from_id(self, node_uuid):
if str(node_uuid) not in self._nodes:
raise exception.ComputeNodeNotFound(node_uuid)
raise exception.ComputeNodeNotFound(name=node_uuid)
return self._nodes[str(node_uuid)]
def get_instance_from_id(self, uuid):
@@ -72,6 +137,17 @@ class ModelRoot(object):
raise exception.InstanceNotFound(name=uuid)
return self._instances[str(uuid)]
def get_node_from_instance_id(self, instance_uuid):
"""Getting host information from the guest instance
:param instance_uuid: the uuid of the instance
:return: node
"""
if str(instance_uuid) not in self.mapping.instance_mapping:
raise exception.InstanceNotFound(name=instance_uuid)
return self.get_node_from_id(
self.mapping.instance_mapping[str(instance_uuid)])
def get_all_instances(self):
return self._instances

View File

@@ -0,0 +1,51 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Authors: Vincent FRANCOISE <Vincent.FRANCOISE@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from oslo_log import log
from watcher.common import rpc
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class NotificationEndpoint(object):
def __init__(self, collector):
super(NotificationEndpoint, self).__init__()
self.collector = collector
self._notifier = None
@abc.abstractproperty
def filter_rule(self):
"""Notification Filter"""
raise NotImplementedError()
@property
def cluster_data_model(self):
return self.collector.cluster_data_model
@property
def notifier(self):
if self._notifier is None:
self._notifier = rpc.get_notifier('decision-engine')
return self._notifier

View File

@@ -0,0 +1,91 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Authors: Vincent FRANCOISE <Vincent.FRANCOISE@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from oslo_log import log
import oslo_messaging as om
import six
LOG = log.getLogger(__name__)
class NotificationFilter(om.NotificationFilter):
"""Notification Endpoint base class
This class is responsible for handling incoming notifications. Depending
on the priority level of the incoming, you may need to implement one or
more of the following methods:
.. code: py
def audit(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
def critical(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
"""
def _build_regex_dict(self, regex_list):
if regex_list is None:
return {}
regex_mapping = {}
for key, value in regex_list.items():
if isinstance(value, dict):
regex_mapping[key] = self._build_regex_dict(value)
else:
if callable(value):
regex_mapping[key] = value
elif value is not None:
regex_mapping[key] = re.compile(value)
else:
regex_mapping[key] = None
return regex_mapping
def _check_for_mismatch(self, data, regex):
if isinstance(regex, dict):
mismatch_results = [
k not in data or not self._check_for_mismatch(data[k], v)
for k, v in regex.items()
]
if not mismatch_results:
return False
return all(mismatch_results)
elif callable(regex):
# The filter is a callable that should return True
# if there is a mismatch
return regex(data)
elif regex is not None and data is None:
return True
elif (regex is not None and
isinstance(data, six.string_types) and
not regex.match(data)):
return True
return False

View File

@@ -0,0 +1,334 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Authors: Vincent FRANCOISE <Vincent.FRANCOISE@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log
from watcher._i18n import _LI
from watcher.common import exception
from watcher.decision_engine.model import element
from watcher.decision_engine.model.notification import base
from watcher.decision_engine.model.notification import filtering
LOG = log.getLogger(__name__)
class NovaNotification(base.NotificationEndpoint):
def get_or_create_instance(self, uuid):
try:
instance = self.cluster_data_model.get_instance_from_id(uuid)
except exception.InstanceNotFound:
# The instance didn't exist yet so we create a new instance object
LOG.debug("New instance created: %s", uuid)
instance = element.Instance()
instance.uuid = uuid
self.cluster_data_model.add_instance(instance)
return instance
def update_instance(self, instance, data):
instance_data = data['nova_object.data']
instance_flavor_data = instance_data['flavor']['nova_object.data']
instance.state = instance_data['state']
instance.hostname = instance_data['host_name']
instance.human_id = instance_data['display_name']
memory_mb = instance_flavor_data['memory_mb']
num_cores = instance_flavor_data['vcpus']
disk_gb = instance_flavor_data['root_gb']
self.update_capacity(element.ResourceType.memory, instance, memory_mb)
self.update_capacity(
element.ResourceType.cpu_cores, instance, num_cores)
self.update_capacity(
element.ResourceType.disk, instance, disk_gb)
node = self.get_or_create_node(instance_data['host'])
self.update_instance_mapping(instance, node)
def update_capacity(self, resource_id, obj, value):
resource = self.cluster_data_model.get_resource_from_id(resource_id)
resource.set_capacity(obj, value)
def legacy_update_instance(self, instance, data):
instance.state = data['state']
instance.hostname = data['hostname']
instance.human_id = data['display_name']
memory_mb = data['memory_mb']
num_cores = data['vcpus']
disk_gb = data['root_gb']
self.update_capacity(element.ResourceType.memory, instance, memory_mb)
self.update_capacity(
element.ResourceType.cpu_cores, instance, num_cores)
self.update_capacity(
element.ResourceType.disk, instance, disk_gb)
node = self.get_or_create_node(data['host'])
self.update_instance_mapping(instance, node)
def get_or_create_node(self, uuid):
if uuid is None:
LOG.debug("Compute node UUID not provided: skipping")
return
try:
node = self.cluster_data_model.get_node_from_id(uuid)
except exception.ComputeNodeNotFound:
# The node didn't exist yet so we create a new node object
LOG.debug("New compute node created: %s", uuid)
node = element.ComputeNode()
node.uuid = uuid
self.cluster_data_model.add_node(node)
return node
def update_instance_mapping(self, instance, node):
if not node:
LOG.debug("Instance %s not yet attached to any node: skipping",
instance.uuid)
return
try:
old_node = self.get_or_create_node(node.uuid)
LOG.debug("Mapped node %s found", node.uuid)
if node and node != old_node:
LOG.debug("Unmapping instance %s from %s",
instance.uuid, node.uuid)
self.cluster_data_model.unmap_instance(instance, old_node)
except exception.InstanceNotFound:
# The instance didn't exist yet so we map it for the first time
LOG.debug("New instance: mapping it to %s", node.uuid)
finally:
if node:
self.cluster_data_model.map_instance(instance, node)
LOG.debug("Mapped instance %s to %s", instance.uuid, node.uuid)
def delete_instance(self, instance, node):
try:
self.cluster_data_model.delete_instance(instance, node)
except Exception as exc:
LOG.exception(exc)
LOG.info(_LI("Instance %s already deleted"), instance.uuid)
class VersionnedNotificationEndpoint(NovaNotification):
publisher_id_regex = r'^nova-compute.*'
class UnversionnedNotificationEndpoint(NovaNotification):
publisher_id_regex = r'^compute.*'
class ServiceUpdated(VersionnedNotificationEndpoint):
@property
def filter_rule(self):
"""Nova service.update notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='service.update',
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
node_data = payload['nova_object.data']
node_uuid = node_data['host']
node = self.get_or_create_node(node_uuid)
node.hostname = node_data['host']
node.state = (
element.ServiceState.OFFLINE.value
if node_data['forced_down'] else element.ServiceState.ONLINE.value)
node.status = (
element.ServiceState.DISABLED.value
if node_data['host'] else element.ServiceState.ENABLED.value)
class InstanceCreated(VersionnedNotificationEndpoint):
@property
def filter_rule(self):
"""Nova instance.update notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='instance.update',
# To be "fully" created, an instance transitions
# from the 'building' state to the 'active' one.
# See http://docs.openstack.org/developer/nova/vmstates.html
payload={
'nova_object.data': {
'state': element.InstanceState.ACTIVE.value,
'state_update': {
'nova_object.data': {
'old_state': element.InstanceState.BUILDING.value,
'state': element.InstanceState.ACTIVE.value,
},
'nova_object.name': 'InstanceStateUpdatePayload',
'nova_object.namespace': 'nova',
},
}
}
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
instance_data = payload['nova_object.data']
instance_uuid = instance_data['uuid']
instance = self.get_or_create_instance(instance_uuid)
self.update_instance(instance, payload)
class InstanceUpdated(VersionnedNotificationEndpoint):
@staticmethod
def _match_not_new_instance_state(data):
is_new_instance = (
data['old_state'] == element.InstanceState.BUILDING.value and
data['state'] == element.InstanceState.ACTIVE.value)
return not is_new_instance
@property
def filter_rule(self):
"""Nova instance.update notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='instance.update',
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
instance_data = payload['nova_object.data']
instance_uuid = instance_data['uuid']
instance = self.get_or_create_instance(instance_uuid)
self.update_instance(instance, payload)
class InstanceDeletedEnd(VersionnedNotificationEndpoint):
@property
def filter_rule(self):
"""Nova service.update notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='instance.delete.end',
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
instance_data = payload['nova_object.data']
instance_uuid = instance_data['uuid']
instance = self.get_or_create_instance(instance_uuid)
node = self.get_or_create_node(instance_data['host'])
self.delete_instance(instance, node)
class LegacyInstanceUpdated(UnversionnedNotificationEndpoint):
@property
def filter_rule(self):
"""Nova compute.instance.update notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='compute.instance.update',
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
self.legacy_update_instance(instance, payload)
class LegacyInstanceCreatedEnd(UnversionnedNotificationEndpoint):
@property
def filter_rule(self):
"""Nova compute.instance.create.end notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='compute.instance.create.end',
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
self.legacy_update_instance(instance, payload)
class LegacyInstanceDeletedEnd(UnversionnedNotificationEndpoint):
@property
def filter_rule(self):
"""Nova compute.instance.delete.end notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='compute.instance.delete.end',
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
node = self.get_or_create_node(payload['host'])
self.delete_instance(instance, node)
class LegacyLiveMigratedEnd(UnversionnedNotificationEndpoint):
@property
def filter_rule(self):
"""Nova *.live_migration.post.dest.end notification filter"""
return filtering.NotificationFilter(
publisher_id=self.publisher_id_regex,
event_type='compute.instance.live_migration.post.dest.end',
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info(_LI("Event '%(event)s' received from %(publisher)s") %
dict(event=event_type, publisher=publisher_id))
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
self.legacy_update_instance(instance, payload)

View File

@@ -52,6 +52,8 @@ class DecisionEngineAPIManager(object):
conductor_endpoints = []
status_endpoints = [notification_handler.NotificationHandler]
notification_endpoints = []
notification_topics = []
def __init__(self):
self.publisher_id = CONF.watcher_decision_engine.publisher_id

View File

@@ -209,24 +209,24 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
"""
return self.migration_attempts
def calculate_weight(self, node, total_cores_used, total_disk_used,
total_memory_used):
def calculate_weight(self, compute_resource, total_cores_used,
total_disk_used, total_memory_used):
"""Calculate weight of every resource
:param element:
:param compute_resource:
:param total_cores_used:
:param total_disk_used:
:param total_memory_used:
:return:
"""
cpu_capacity = self.compute_model.get_resource_from_id(
element.ResourceType.cpu_cores).get_capacity(node)
element.ResourceType.cpu_cores).get_capacity(compute_resource)
disk_capacity = self.compute_model.get_resource_from_id(
element.ResourceType.disk).get_capacity(node)
element.ResourceType.disk).get_capacity(compute_resource)
memory_capacity = self.compute_model.get_resource_from_id(
element.ResourceType.memory).get_capacity(node)
element.ResourceType.memory).get_capacity(compute_resource)
score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
float(cpu_capacity))
@@ -261,10 +261,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
if host_avg_cpu_util is None:
LOG.error(
_LE("No values returned by %(resource_id)s "
"for %(metric_name)s"),
resource_id=resource_id,
metric_name=self.HOST_CPU_USAGE_METRIC_NAME,
)
"for %(metric_name)s") % dict(
resource_id=resource_id,
metric_name=self.HOST_CPU_USAGE_METRIC_NAME))
host_avg_cpu_util = 100
cpu_capacity = self.compute_model.get_resource_from_id(
@@ -302,10 +301,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
if instance_cpu_utilization is None:
LOG.error(
_LE("No values returned by %(resource_id)s "
"for %(metric_name)s"),
resource_id=instance.uuid,
metric_name=self.INSTANCE_CPU_USAGE_METRIC_NAME,
)
"for %(metric_name)s") % dict(
resource_id=instance.uuid,
metric_name=self.INSTANCE_CPU_USAGE_METRIC_NAME))
instance_cpu_utilization = 100
cpu_capacity = self.compute_model.get_resource_from_id(
@@ -335,18 +333,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
def score_of_nodes(self, score):
"""Calculate score of nodes based on load by VMs"""
for node_id in self.compute_model.get_all_compute_nodes():
node = self.compute_model. \
get_node_from_id(node_id)
count = self.compute_model.get_mapping(). \
get_node_instances_from_id(node_id)
for node in self.compute_model.get_all_compute_nodes().values():
count = self.compute_model.mapping.get_node_instances_from_id(
node.uuid)
if len(count) > 0:
result = self.calculate_score_node(node)
else:
# The node has not VMs
result = 0
if len(count) > 0:
score.append((node_id, result))
score.append((node.uuid, result))
return score
def node_and_instance_score(self, sorted_score, score):
@@ -368,7 +364,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
def create_migration_instance(self, mig_instance, mig_source_node,
mig_destination_node):
"""Create migration VM"""
if self.compute_model.get_mapping().migrate_instance(
if self.compute_model.migrate_instance(
mig_instance, mig_source_node, mig_destination_node):
self.add_migration(mig_instance.uuid, 'live',
mig_source_node.uuid,
@@ -427,14 +423,14 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
self.compute_attempts(size_cluster)
for node_id in self.compute_model.get_all_compute_nodes():
node = self.compute_model.get_node_from_id(node_id)
count = self.compute_model.get_mapping(). \
get_node_instances_from_id(node_id)
if len(count) == 0:
for node_uuid, node in self.compute_model.get_all_compute_nodes(
).items():
node_instances = (self.compute_model.mapping
.get_node_instances_from_id(node_uuid))
if node_instances:
if node.state == element.ServiceState.ENABLED:
self.add_change_service_state(
node_id, element.ServiceState.DISABLED.value)
node_uuid, element.ServiceState.DISABLED.value)
while self.get_allowed_migration_attempts() >= unsuccessful_migration:
if not first_migration:

View File

@@ -272,7 +272,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
# always use the host with lowerest outlet temperature
mig_destination_node = dest_servers[0]['node']
# generate solution to migrate the instance to the dest server,
if self.compute_model.mapping.migrate_instance(
if self.compute_model.migrate_instance(
instance_src, mig_source_node, mig_destination_node):
parameters = {'migration_type': 'live',
'source_node': mig_source_node.uuid,

View File

@@ -333,7 +333,7 @@ class UniformAirflow(base.BaseStrategy):
for info in destination_hosts:
instance = info['instance']
destination_node = info['node']
if self.compute_model.mapping.migrate_instance(
if self.compute_model.migrate_instance(
instance, source_node, destination_node):
parameters = {'migration_type': 'live',
'source_node': source_node.uuid,

View File

@@ -329,7 +329,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
# always use the host with lowerest CPU utilization
mig_destination_node = destination_hosts[0]['node']
# generate solution to migrate the instance to the dest server,
if self.compute_model.mapping.migrate_instance(
if self.compute_model.migrate_instance(
instance_src, source_node, mig_destination_node):
parameters = {'migration_type': 'live',
'source_node': source_node.uuid,

View File

@@ -350,7 +350,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
def create_migration_instance(self, mig_instance, mig_source_node,
mig_destination_node):
"""Create migration VM """
if self.compute_model.get_mapping().migrate_instance(
if self.compute_model.migrate_instance(
mig_instance, mig_source_node, mig_destination_node):
self.add_migration(mig_instance.uuid, 'live',
mig_source_node.uuid,