Graph cluster model instead of mapping one

In this changeset, I use https://review.openstack.org/#/c/362730/
as an example to make the existing ModelRoot fully graph-based.

Change-Id: I3a1ec8674b885d75221035459233722c18972f67
Implements: blueprint graph-based-cluster-model
This commit is contained in:
Vincent Françoise
2017-01-10 16:06:06 +01:00
parent 41f579d464
commit d433d6b3c8
44 changed files with 1001 additions and 993 deletions

View File

@@ -34,3 +34,7 @@ class Model(object):
@abc.abstractmethod
def to_string(self):
raise NotImplementedError()
@abc.abstractmethod
def to_xml(self):
raise NotImplementedError()

View File

@@ -1,23 +1,21 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
# Copyright (c) 2017 Intel Innovation and Research Ireland Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log
from watcher.common import exception
from watcher.common import nova_helper
from watcher.decision_engine.model.collector import base
from watcher.decision_engine.model import element
@@ -30,13 +28,12 @@ LOG = log.getLogger(__name__)
class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
"""Nova cluster data model collector
The Nova cluster data model collector creates an in-memory
representation of the resources exposed by the compute service.
The Nova cluster data model collector creates an in-memory
representation of the resources exposed by the compute service.
"""
def __init__(self, config, osc=None):
super(NovaClusterDataModelCollector, self).__init__(config, osc)
self.wrapper = nova_helper.NovaHelper(osc=self.osc)
@property
def notification_endpoints(self):
@@ -62,49 +59,313 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
"""Build the compute cluster data model"""
LOG.debug("Building latest Nova cluster data model")
model = model_root.ModelRoot()
mem = element.Resource(element.ResourceType.memory)
num_cores = element.Resource(element.ResourceType.cpu_cores)
disk = element.Resource(element.ResourceType.disk)
disk_capacity = element.Resource(element.ResourceType.disk_capacity)
model.create_resource(mem)
model.create_resource(num_cores)
model.create_resource(disk)
model.create_resource(disk_capacity)
builder = ModelBuilder(self.osc)
return builder.execute()
flavor_cache = {}
nodes = self.wrapper.get_compute_node_list()
for n in nodes:
service = self.wrapper.nova.services.find(id=n.service['id'])
# create node in cluster_model_collector
node = element.ComputeNode(n.id)
node.uuid = service.host
node.hostname = n.hypervisor_hostname
# set capacity
mem.set_capacity(node, n.memory_mb)
disk.set_capacity(node, n.free_disk_gb)
disk_capacity.set_capacity(node, n.local_gb)
num_cores.set_capacity(node, n.vcpus)
node.state = n.state
node.status = n.status
model.add_node(node)
instances = self.wrapper.get_instances_by_node(str(service.host))
for v in instances:
# create VM in cluster_model_collector
instance = element.Instance()
instance.uuid = v.id
# nova/nova/compute/instance_states.py
instance.state = getattr(v, 'OS-EXT-STS:vm_state')
# set capacity
self.wrapper.get_flavor_instance(v, flavor_cache)
mem.set_capacity(instance, v.flavor['ram'])
# FIXME: update all strategies to use disk_capacity
# for instances instead of disk
disk.set_capacity(instance, v.flavor['disk'])
disk_capacity.set_capacity(instance, v.flavor['disk'])
num_cores.set_capacity(instance, v.flavor['vcpus'])
class ModelBuilder(object):
"""Build the graph-based model
model.map_instance(instance, node)
This model builder adds the following data"
return model
- Compute-related knowledge (Nova)
- TODO(v-francoise): Storage-related knowledge (Cinder)
- TODO(v-francoise): Network-related knowledge (Neutron)
NOTE(v-francoise): This model builder is meant to be extended in the future
to also include both storage and network information respectively coming
from Cinder and Neutron. Some prelimary work has been done in this
direction in https://review.openstack.org/#/c/362730 but since we cannot
guarantee a sufficient level of consistency for neither the storage nor the
network part before the end of the Ocata cycle, this work has been
re-scheduled for Pike. In the meantime, all the associated code has been
commented out.
"""
def __init__(self, osc):
self.osc = osc
self.model = model_root.ModelRoot()
self.nova = osc.nova()
self.nova_helper = nova_helper.NovaHelper(osc=self.osc)
# self.neutron = osc.neutron()
# self.cinder = osc.cinder()
def _add_physical_layer(self):
"""Add the physical layer of the graph.
This includes components which represent actual infrastructure
hardware.
"""
for cnode in self.nova_helper.get_compute_node_list():
self.add_compute_node(cnode)
def add_compute_node(self, node):
# Build and add base node.
compute_node = self.build_compute_node(node)
self.model.add_node(compute_node)
# NOTE(v-francoise): we can encapsulate capabilities of the node
# (special instruction sets of CPUs) in the attributes; as well as
# sub-nodes can be added re-presenting e.g. GPUs/Accelerators etc.
# # Build & add disk, memory, network and cpu nodes.
# disk_id, disk_node = self.build_disk_compute_node(base_id, node)
# self.add_node(disk_id, disk_node)
# mem_id, mem_node = self.build_memory_compute_node(base_id, node)
# self.add_node(mem_id, mem_node)
# net_id, net_node = self._build_network_compute_node(base_id)
# self.add_node(net_id, net_node)
# cpu_id, cpu_node = self.build_cpu_compute_node(base_id, node)
# self.add_node(cpu_id, cpu_node)
# # Connect the base compute node to the dependant nodes.
# self.add_edges_from([(base_id, disk_id), (base_id, mem_id),
# (base_id, cpu_id), (base_id, net_id)],
# label="contains")
def build_compute_node(self, node):
"""Build a compute node from a Nova compute node
:param node: A node hypervisor instance
:type node: :py:class:`~novaclient.v2.hypervisors.Hypervisor`
"""
# build up the compute node.
compute_service = self.nova_helper.get_service(node.service["id"])
node_attributes = {
"id": node.id,
"human_id": None, # TODO(v-francoise): get rid of it
"uuid": compute_service.host,
"hostname": node.hypervisor_hostname,
"memory": node.memory_mb,
"disk": node.free_disk_gb,
"disk_capacity": node.local_gb,
"vcpus": node.vcpus,
"state": node.state,
"status": node.status}
compute_node = element.ComputeNode(**node_attributes)
# compute_node = self._build_node("physical", "compute", "hypervisor",
# node_attributes)
return compute_node
# def _build_network_compute_node(self, base_node):
# attributes = {}
# net_node = self._build_node("physical", "network", "NIC", attributes)
# net_id = "{}_network".format(base_node)
# return net_id, net_node
# def build_disk_compute_node(self, base_node, compute):
# # Build disk node attributes.
# disk_attributes = {
# "size_gb": compute.local_gb,
# "used_gb": compute.local_gb_used,
# "available_gb": compute.free_disk_gb}
# disk_node = self._build_node("physical", "storage", "disk",
# disk_attributes)
# disk_id = "{}_disk".format(base_node)
# return disk_id, disk_node
# def build_memory_compute_node(self, base_node, compute):
# # Build memory node attributes.
# memory_attrs = {"size_mb": compute.memory_mb,
# "used_mb": compute.memory_mb_used,
# "available_mb": compute.free_ram_mb}
# memory_node = self._build_node("physical", "memory", "memory",
# memory_attrs)
# memory_id = "{}_memory".format(base_node)
# return memory_id, memory_node
# def build_cpu_compute_node(self, base_node, compute):
# # Build memory node attributes.
# cpu_attributes = {"vcpus": compute.vcpus,
# "vcpus_used": compute.vcpus_used,
# "info": jsonutils.loads(compute.cpu_info)}
# cpu_node = self._build_node("physical", "cpu", "cpu", cpu_attributes)
# cpu_id = "{}_cpu".format(base_node)
# return cpu_id, cpu_node
# @staticmethod
# def _build_node(layer, category, node_type, attributes):
# return {"layer": layer, "category": category, "type": node_type,
# "attributes": attributes}
def _add_virtual_layer(self):
"""Add the virtual layer to the graph.
This layer is the virtual components of the infrastructure,
such as vms.
"""
self._add_virtual_servers()
# self._add_virtual_network()
# self._add_virtual_storage()
def _add_virtual_servers(self):
all_instances = self.nova_helper.get_instance_list()
for inst in all_instances:
# Add Node
instance = self._build_instance_node(inst)
self.model.add_instance(instance)
# Get the cnode_name uuid.
cnode_uuid = getattr(inst, "OS-EXT-SRV-ATTR:host")
if cnode_uuid is None:
# The instance is not attached to any Compute node
continue
try:
# Nova compute node
# cnode = self.nova_helper.get_compute_node_by_hostname(
# cnode_uuid)
compute_node = self.model.get_node_by_uuid(
cnode_uuid)
# Connect the instance to its compute node
self.model.add_edge(
instance, compute_node, label='RUNS_ON')
except exception.ComputeNodeNotFound:
continue
def _build_instance_node(self, instance):
"""Build an instance node
Create an instance node for the graph using nova and the
`server` nova object.
:param instance: Nova VM object.
:return: A instance node for the graph.
"""
flavor = self.nova_helper.get_flavor(instance.flavor["id"])
instance_attributes = {
"uuid": instance.id,
"human_id": instance.human_id,
"memory": flavor.ram,
"disk": flavor.disk,
"disk_capacity": flavor.disk,
"vcpus": flavor.vcpus,
"state": getattr(instance, "OS-EXT-STS:vm_state")}
# node_attributes = dict()
# node_attributes["layer"] = "virtual"
# node_attributes["category"] = "compute"
# node_attributes["type"] = "compute"
# node_attributes["attributes"] = instance_attributes
return element.Instance(**instance_attributes)
# def _add_virtual_storage(self):
# try:
# volumes = self.cinder.volumes.list()
# except Exception:
# return
# for volume in volumes:
# volume_id, volume_node = self._build_storage_node(volume)
# self.add_node(volume_id, volume_node)
# host = self._get_volume_host_id(volume_node)
# self.add_edge(volume_id, host)
# # Add connections to an instance.
# if volume_node['attributes']['attachments']:
# for attachment in volume_node['attributes']['attachments']:
# self.add_edge(volume_id, attachment['server_id'],
# label='ATTACHED_TO')
# volume_node['attributes'].pop('attachments')
# def _add_virtual_network(self):
# try:
# routers = self.neutron.list_routers()
# except Exception:
# return
# for network in self.neutron.list_networks()['networks']:
# self.add_node(*self._build_network(network))
# for router in routers['routers']:
# self.add_node(*self._build_router(router))
# router_interfaces, _, compute_ports = self._group_ports()
# for router_interface in router_interfaces:
# interface = self._build_router_interface(router_interface)
# router_interface_id = interface[0]
# router_interface_node = interface[1]
# router_id = interface[2]
# self.add_node(router_interface_id, router_interface_node)
# self.add_edge(router_id, router_interface_id)
# network_id = router_interface_node['attributes']['network_id']
# self.add_edge(router_interface_id, network_id)
# for compute_port in compute_ports:
# cp_id, cp_node, instance_id = self._build_compute_port_node(
# compute_port)
# self.add_node(cp_id, cp_node)
# self.add_edge(cp_id, vm_id)
# net_id = cp_node['attributes']['network_id']
# self.add_edge(net_id, cp_id)
# # Connect port to physical node
# phys_net_node = "{}_network".format(cp_node['attributes']
# ['binding:host_id'])
# self.add_edge(cp_id, phys_net_node)
# def _get_volume_host_id(self, volume_node):
# host = volume_node['attributes']['os-vol-host-attr:host']
# if host.find('@') != -1:
# host = host.split('@')[0]
# elif host.find('#') != -1:
# host = host.split('#')[0]
# return "{}_disk".format(host)
# def _build_storage_node(self, volume_obj):
# volume = volume_obj.__dict__
# volume["name"] = volume["id"]
# volume.pop("id")
# volume.pop("manager")
# node = self._build_node("virtual", "storage", 'volume', volume)
# return volume["name"], node
# def _build_compute_port_node(self, compute_port):
# compute_port["name"] = compute_port["id"]
# compute_port.pop("id")
# nde_type = "{}_port".format(
# compute_port["device_owner"].split(":")[0])
# compute_port.pop("device_owner")
# device_id = compute_port["device_id"]
# compute_port.pop("device_id")
# node = self._build_node("virtual", "network", nde_type, compute_port)
# return compute_port["name"], node, device_id
# def _group_ports(self):
# router_interfaces = []
# floating_ips = []
# compute_ports = []
# interface_types = ["network:router_interface",
# 'network:router_gateway']
# for port in self.neutron.list_ports()['ports']:
# if port['device_owner'] in interface_types:
# router_interfaces.append(port)
# elif port['device_owner'].startswith('compute:'):
# compute_ports.append(port)
# elif port['device_owner'] == 'network:floatingip':
# floating_ips.append(port)
# return router_interfaces, floating_ips, compute_ports
# def _build_router_interface(self, interface):
# interface["name"] = interface["id"]
# interface.pop("id")
# node_type = interface["device_owner"].split(":")[1]
# node = self._build_node("virtual", "network", node_type, interface)
# return interface["name"], node, interface["device_id"]
# def _build_router(self, router):
# router_attrs = {"uuid": router['id'],
# "name": router['name'],
# "state": router['status']}
# node = self._build_node('virtual', 'network', 'router', router_attrs)
# return str(router['id']), node
# def _build_network(self, network):
# node = self._build_node('virtual', 'network', 'network', network)
# return network['id'], node
def execute(self):
"""Instantiates the graph with the openstack cluster data.
The graph is populated along 2 layers: virtual and physical. As each
new layer is built connections are made back to previous layers.
"""
self._add_physical_layer()
self._add_virtual_layer()
return self.model

View File

@@ -17,13 +17,51 @@
# limitations under the License.
import abc
import collections
from lxml import etree
from oslo_log import log
import six
from watcher.objects import base
from watcher.objects import fields as wfields
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Element(object):
class Element(base.WatcherObject, base.WatcherObjectDictCompat):
# Initial version
VERSION = '1.0'
fields = {}
def __init__(self, context=None, **kwargs):
for name, field in self.fields.items():
# The idea here is to force the initialization of unspecified
# fields that have a default value
if (name not in kwargs and not field.nullable and
field.default != wfields.UnspecifiedDefault):
kwargs[name] = field.default
super(Element, self).__init__(context, **kwargs)
@abc.abstractmethod
def accept(self, visitor):
raise NotImplementedError()
def as_xml_element(self):
sorted_fieldmap = []
for field in self.fields:
try:
value = str(self[field])
sorted_fieldmap.append((field, value))
except Exception as exc:
LOG.exception(exc)
attrib = collections.OrderedDict(sorted_fieldmap)
element_name = self.__class__.__name__
instance_el = etree.Element(element_name, attrib=attrib)
return instance_el

View File

@@ -19,39 +19,15 @@ import abc
import six
from watcher.decision_engine.model.element import base
from watcher.objects import fields as wfields
@six.add_metaclass(abc.ABCMeta)
class ComputeResource(base.Element):
def __init__(self):
self._uuid = ""
self._human_id = ""
self._hostname = ""
VERSION = '1.0'
@property
def uuid(self):
return self._uuid
@uuid.setter
def uuid(self, u):
self._uuid = u
@property
def hostname(self):
return self._hostname
@hostname.setter
def hostname(self, h):
self._hostname = h
@property
def human_id(self):
return self._human_id
@human_id.setter
def human_id(self, h):
self._human_id = h
def __str__(self):
return "[{0}]".format(self.uuid)
fields = {
"uuid": wfields.StringField(),
"human_id": wfields.StringField(default=""),
}

View File

@@ -17,6 +17,8 @@
import enum
from watcher.decision_engine.model.element import compute_resource
from watcher.objects import base
from watcher.objects import fields as wfields
class InstanceState(enum.Enum):
@@ -36,19 +38,17 @@ class InstanceState(enum.Enum):
ERROR = 'error'
@base.WatcherObjectRegistry.register_if(False)
class Instance(compute_resource.ComputeResource):
def __init__(self):
super(Instance, self).__init__()
self._state = InstanceState.ACTIVE.value
fields = {
"state": wfields.StringField(default=InstanceState.ACTIVE.value),
"memory": wfields.NonNegativeIntegerField(),
"disk": wfields.IntegerField(),
"disk_capacity": wfields.NonNegativeIntegerField(),
"vcpus": wfields.NonNegativeIntegerField(),
}
def accept(self, visitor):
raise NotImplementedError()
@property
def state(self):
return self._state
@state.setter
def state(self, state):
self._state = state

View File

@@ -17,6 +17,8 @@
import enum
from watcher.decision_engine.model.element import compute_resource
from watcher.objects import base
from watcher.objects import fields as wfields
class ServiceState(enum.Enum):
@@ -26,29 +28,20 @@ class ServiceState(enum.Enum):
DISABLED = 'disabled'
@base.WatcherObjectRegistry.register_if(False)
class ComputeNode(compute_resource.ComputeResource):
def __init__(self, id):
super(ComputeNode, self).__init__()
self.id = id
self._state = ServiceState.ONLINE.value
self._status = ServiceState.ENABLED.value
fields = {
"id": wfields.NonNegativeIntegerField(),
"hostname": wfields.StringField(),
"status": wfields.StringField(default=ServiceState.ENABLED.value),
"state": wfields.StringField(default=ServiceState.ONLINE.value),
"memory": wfields.NonNegativeIntegerField(),
"disk": wfields.IntegerField(),
"disk_capacity": wfields.NonNegativeIntegerField(),
"vcpus": wfields.NonNegativeIntegerField(),
}
def accept(self, visitor):
raise NotImplementedError()
@property
def state(self):
return self._state
@state.setter
def state(self, state):
self._state = state
@property
def status(self):
return self._status
@status.setter
def status(self, s):
self._status = s

View File

@@ -20,7 +20,8 @@ from watcher.common import exception
class ResourceType(enum.Enum):
cpu_cores = 'num_cores'
cpu_cores = 'vcpus'
vcpus = 'vcpus'
memory = 'memory'
disk = 'disk'
disk_capacity = 'disk_capacity'

View File

@@ -1,101 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_concurrency import lockutils
from oslo_log import log
from watcher._i18n import _LW
LOG = log.getLogger(__name__)
class Mapping(object):
def __init__(self, model):
self.model = model
self.compute_node_mapping = {}
self.instance_mapping = {}
def map(self, node, instance):
"""Select the node where the instance is launched
:param node: the node
:param instance: the virtual machine or instance
"""
with lockutils.lock(__name__):
# init first
if node.uuid not in self.compute_node_mapping.keys():
self.compute_node_mapping[node.uuid] = set()
# map node => instances
self.compute_node_mapping[node.uuid].add(instance.uuid)
# map instance => node
self.instance_mapping[instance.uuid] = node.uuid
def unmap(self, node, instance):
"""Remove the instance from the node
:param node: the node
:param instance: the virtual machine or instance
"""
self.unmap_by_uuid(node.uuid, instance.uuid)
def unmap_by_uuid(self, node_uuid, instance_uuid):
"""Remove the instance (by id) from the node (by id)
:rtype : object
"""
with lockutils.lock(__name__):
if str(node_uuid) in self.compute_node_mapping:
self.compute_node_mapping[str(node_uuid)].remove(
str(instance_uuid))
# remove instance
self.instance_mapping.pop(instance_uuid)
else:
LOG.warning(
_LW("Trying to delete the instance %(instance)s but it "
"was not found on node %(node)s") %
{'instance': instance_uuid, 'node': node_uuid})
def get_mapping(self):
return self.compute_node_mapping
def get_node_from_instance(self, instance):
return self.get_node_by_instance_uuid(instance.uuid)
def get_node_by_instance_uuid(self, instance_uuid):
"""Getting host information from the guest instance
:param instance: the uuid of the instance
:return: node
"""
return self.model.get_node_by_uuid(
self.instance_mapping[str(instance_uuid)])
def get_node_instances(self, node):
"""Get the list of instances running on the node
:param node:
:return:
"""
return self.get_node_instances_by_uuid(node.uuid)
def get_node_instances_by_uuid(self, node_uuid):
if str(node_uuid) in self.compute_node_mapping.keys():
return self.compute_node_mapping[str(node_uuid)]
else:
# empty
return set()

View File

@@ -1,39 +1,40 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
# Copyright (c) 2016 Intel Innovation and Research Ireland Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
"""
Openstack implementation of the cluster graph.
"""
from lxml import etree
import networkx as nx
from oslo_concurrency import lockutils
from oslo_log import log
import six
from watcher._i18n import _
from watcher.common import exception
from watcher.common import utils
from watcher.decision_engine.model import base
from watcher.decision_engine.model import element
from watcher.decision_engine.model import mapping
LOG = log.getLogger(__name__)
class ModelRoot(base.Model):
class ModelRoot(nx.DiGraph, base.Model):
"""Cluster graph for an Openstack cluster."""
def __init__(self, stale=False):
self._nodes = utils.Struct()
self._instances = utils.Struct()
self.mapping = mapping.Mapping(self)
self.resource = utils.Struct()
super(ModelRoot, self).__init__()
self.stale = stale
def __nonzero__(self):
@@ -41,35 +42,47 @@ class ModelRoot(base.Model):
__bool__ = __nonzero__
def assert_node(self, obj):
@staticmethod
def assert_node(obj):
if not isinstance(obj, element.ComputeNode):
raise exception.IllegalArgumentException(
message=_("'obj' argument type is not valid"))
message=_("'obj' argument type is not valid: %s") % type(obj))
def assert_instance(self, obj):
@staticmethod
def assert_instance(obj):
if not isinstance(obj, element.Instance):
raise exception.IllegalArgumentException(
message=_("'obj' argument type is not valid"))
@lockutils.synchronized("model_root")
def add_node(self, node):
self.assert_node(node)
self._nodes[node.uuid] = node
super(ModelRoot, self).add_node(node)
@lockutils.synchronized("model_root")
def remove_node(self, node):
self.assert_node(node)
if str(node.uuid) not in self._nodes:
try:
super(ModelRoot, self).remove_node(node)
except nx.NetworkXError as exc:
LOG.exception(exc)
raise exception.ComputeNodeNotFound(name=node.uuid)
else:
del self._nodes[node.uuid]
@lockutils.synchronized("model_root")
def add_instance(self, instance):
self.assert_instance(instance)
self._instances[instance.uuid] = instance
try:
super(ModelRoot, self).add_node(instance)
except nx.NetworkXError as exc:
LOG.exception(exc)
raise exception.InstanceNotFound(name=instance.uuid)
@lockutils.synchronized("model_root")
def remove_instance(self, instance):
self.assert_instance(instance)
del self._instances[instance.uuid]
super(ModelRoot, self).remove_node(instance)
@lockutils.synchronized("model_root")
def map_instance(self, instance, node):
"""Map a newly created instance to a node
@@ -82,38 +95,25 @@ class ModelRoot(base.Model):
instance = self.get_instance_by_uuid(instance)
if isinstance(node, six.string_types):
node = self.get_node_by_uuid(node)
self.assert_node(node)
self.assert_instance(instance)
self.add_instance(instance)
self.mapping.map(node, instance)
self.add_edge(instance, node)
@lockutils.synchronized("model_root")
def unmap_instance(self, instance, node):
"""Unmap an instance from a node
:param instance: :py:class:`~.Instance` object or instance UUID
:type instance: str or :py:class:`~.Instance`
:param node: :py:class:`~.ComputeNode` object or node UUID
:type node: str or :py:class:`~.Instance`
"""
if isinstance(instance, six.string_types):
instance = self.get_instance_by_uuid(instance)
if isinstance(node, six.string_types):
node = self.get_node_by_uuid(node)
self.add_instance(instance)
self.mapping.unmap(node, instance)
self.remove_edge(instance, node)
def delete_instance(self, instance, node=None):
if node is not None:
self.mapping.unmap(node, instance)
self.assert_instance(instance)
self.remove_instance(instance)
for resource in self.resource.values():
try:
resource.unset_capacity(instance)
except KeyError:
pass
@lockutils.synchronized("model_root")
def migrate_instance(self, instance, source_node, destination_node):
"""Migrate single instance from source_node to destination_node
@@ -122,96 +122,96 @@ class ModelRoot(base.Model):
:param destination_node:
:return:
"""
self.assert_instance(instance)
self.assert_node(source_node)
self.assert_node(destination_node)
if source_node == destination_node:
return False
# unmap
self.mapping.unmap(source_node, instance)
self.remove_edge(instance, source_node)
# map
self.mapping.map(destination_node, instance)
self.add_edge(instance, destination_node)
return True
@lockutils.synchronized("model_root")
def get_all_compute_nodes(self):
return self._nodes
return {cn.uuid: cn for cn in self.nodes()
if isinstance(cn, element.ComputeNode)}
def get_node_by_uuid(self, node_uuid):
if str(node_uuid) not in self._nodes:
raise exception.ComputeNodeNotFound(name=node_uuid)
return self._nodes[str(node_uuid)]
@lockutils.synchronized("model_root")
def get_node_by_uuid(self, uuid):
for graph_node in self.nodes():
if (isinstance(graph_node, element.ComputeNode) and
graph_node.uuid == uuid):
return graph_node
raise exception.ComputeNodeNotFound(name=uuid)
@lockutils.synchronized("model_root")
def get_instance_by_uuid(self, uuid):
if str(uuid) not in self._instances:
raise exception.InstanceNotFound(name=uuid)
return self._instances[str(uuid)]
return self._get_instance_by_uuid(uuid)
def _get_instance_by_uuid(self, uuid):
for graph_node in self.nodes():
if (isinstance(graph_node, element.Instance) and
graph_node.uuid == str(uuid)):
return graph_node
raise exception.InstanceNotFound(name=uuid)
@lockutils.synchronized("model_root")
def get_node_by_instance_uuid(self, instance_uuid):
"""Getting host information from the guest instance
:param instance_uuid: the uuid of the instance
:return: node
"""
if str(instance_uuid) not in self.mapping.instance_mapping:
raise exception.InstanceNotFound(name=instance_uuid)
return self.get_node_by_uuid(
self.mapping.instance_mapping[str(instance_uuid)])
instance = self._get_instance_by_uuid(instance_uuid)
for node in self.neighbors(instance):
if isinstance(node, element.ComputeNode):
return node
raise exception.ComputeNodeNotFound(name=instance_uuid)
@lockutils.synchronized("model_root")
def get_all_instances(self):
return self._instances
def get_mapping(self):
return self.mapping
def create_resource(self, r):
self.resource[str(r.name)] = r
return {inst.uuid: inst for inst in self.nodes()
if isinstance(inst, element.Instance)}
@lockutils.synchronized("model_root")
def get_resource_by_uuid(self, resource_id):
return self.resource[str(resource_id)]
# TODO(v-francoise): deprecate this method
# This is a trick to keep the compatibility with the old model root
class Resource(object):
def __init__(self, resource_id):
if isinstance(resource_id, element.ResourceType):
resource_id = resource_id.value
self.resource_id = resource_id
def get_capacity(self, element):
# We ignore element because value already contains the value
return getattr(element, self.resource_id)
return Resource(resource_id)
@lockutils.synchronized("model_root")
def get_node_instances(self, node):
return self.mapping.get_node_instances(node)
self.assert_node(node)
node_instances = []
for neighbor in self.predecessors(node):
if isinstance(neighbor, element.Instance):
node_instances.append(neighbor)
def _build_compute_node_element(self, compute_node):
attrib = collections.OrderedDict(
id=six.text_type(compute_node.id), uuid=compute_node.uuid,
human_id=compute_node.human_id, hostname=compute_node.hostname,
state=compute_node.state, status=compute_node.status)
for resource_name, resource in sorted(
self.resource.items(), key=lambda x: x[0]):
res_value = resource.get_capacity(compute_node)
if res_value is not None:
attrib[resource_name] = six.text_type(res_value)
compute_node_el = etree.Element("ComputeNode", attrib=attrib)
return compute_node_el
def _build_instance_element(self, instance):
attrib = collections.OrderedDict(
uuid=instance.uuid, human_id=instance.human_id,
hostname=instance.hostname, state=instance.state)
for resource_name, resource in sorted(
self.resource.items(), key=lambda x: x[0]):
res_value = resource.get_capacity(instance)
if res_value is not None:
attrib[resource_name] = six.text_type(res_value)
instance_el = etree.Element("Instance", attrib=attrib)
return instance_el
return node_instances
def to_string(self):
return self.to_xml()
def to_xml(self):
root = etree.Element("ModelRoot")
# Build compute node tree
for cn in sorted(self.get_all_compute_nodes().values(),
key=lambda cn: cn.uuid):
compute_node_el = self._build_compute_node_element(cn)
compute_node_el = cn.as_xml_element()
# Build mapped instance tree
node_instance_uuids = self.get_node_instances(cn)
for instance_uuid in sorted(node_instance_uuids):
instance = self.get_instance_by_uuid(instance_uuid)
instance_el = self._build_instance_element(instance)
node_instances = self.get_node_instances(cn)
for instance in sorted(node_instances, key=lambda x: x.uuid):
instance_el = instance.as_xml_element()
compute_node_el.append(instance_el)
root.append(compute_node_el)
@@ -221,51 +221,23 @@ class ModelRoot(base.Model):
key=lambda inst: inst.uuid):
try:
self.get_node_by_instance_uuid(instance.uuid)
except exception.InstanceNotFound:
root.append(self._build_instance_element(instance))
except (exception.InstanceNotFound, exception.ComputeNodeNotFound):
root.append(instance.as_xml_element())
return etree.tostring(root, pretty_print=True).decode('utf-8')
@classmethod
def from_xml(cls, data):
model = cls()
root = etree.fromstring(data)
mem = element.Resource(element.ResourceType.memory)
num_cores = element.Resource(element.ResourceType.cpu_cores)
disk = element.Resource(element.ResourceType.disk)
disk_capacity = element.Resource(element.ResourceType.disk_capacity)
model.create_resource(mem)
model.create_resource(num_cores)
model.create_resource(disk)
model.create_resource(disk_capacity)
for cn in root.findall('.//ComputeNode'):
node = element.ComputeNode(cn.get('id'))
node.uuid = cn.get('uuid')
node.hostname = cn.get('hostname')
# set capacity
mem.set_capacity(node, int(cn.get(str(mem.name))))
disk.set_capacity(node, int(cn.get(str(disk.name))))
disk_capacity.set_capacity(
node, int(cn.get(str(disk_capacity.name))))
num_cores.set_capacity(node, int(cn.get(str(num_cores.name))))
node.state = cn.get('state')
node.status = cn.get('status')
node = element.ComputeNode(**cn.attrib)
model.add_node(node)
for inst in root.findall('.//Instance'):
instance = element.Instance()
instance.uuid = inst.get('uuid')
instance.state = inst.get('state')
mem.set_capacity(instance, int(inst.get(str(mem.name))))
disk.set_capacity(instance, int(inst.get(str(disk.name))))
disk_capacity.set_capacity(
instance, int(inst.get(str(disk_capacity.name))))
num_cores.set_capacity(
instance, int(inst.get(str(num_cores.name))))
instance = element.Instance(**inst.attrib)
model.add_instance(instance)
parent = inst.getparent()
if parent.tag == 'ComputeNode':

View File

@@ -18,7 +18,7 @@
from oslo_log import log
from watcher._i18n import _LI
from watcher._i18n import _LI, _LW
from watcher.common import exception
from watcher.common import nova_helper
from watcher.decision_engine.model import element
@@ -40,14 +40,21 @@ class NovaNotification(base.NotificationEndpoint):
self._nova = nova_helper.NovaHelper()
return self._nova
def get_or_create_instance(self, uuid):
def get_or_create_instance(self, instance_uuid, node_uuid=None):
try:
instance = self.cluster_data_model.get_instance_by_uuid(uuid)
if node_uuid:
self.get_or_create_node(node_uuid)
except exception.ComputeNodeNotFound:
LOG.warning(_LW("Could not find compute node %(node)s for "
"instance %(instance)s"),
dict(node=node_uuid, instance=instance_uuid))
try:
instance = self.cluster_data_model.get_instance_by_uuid(
instance_uuid)
except exception.InstanceNotFound:
# The instance didn't exist yet so we create a new instance object
LOG.debug("New instance created: %s", uuid)
instance = element.Instance()
instance.uuid = uuid
LOG.debug("New instance created: %s", instance_uuid)
instance = element.Instance(uuid=instance_uuid)
self.cluster_data_model.add_instance(instance)
@@ -57,9 +64,11 @@ class NovaNotification(base.NotificationEndpoint):
instance_data = data['nova_object.data']
instance_flavor_data = instance_data['flavor']['nova_object.data']
instance.state = instance_data['state']
instance.hostname = instance_data['host_name']
instance.human_id = instance_data['display_name']
instance.update({
'state': instance_data['state'],
'hostname': instance_data['host_name'],
'human_id': instance_data['display_name'],
})
memory_mb = instance_flavor_data['memory_mb']
num_cores = instance_flavor_data['vcpus']
@@ -67,7 +76,7 @@ class NovaNotification(base.NotificationEndpoint):
self.update_capacity(element.ResourceType.memory, instance, memory_mb)
self.update_capacity(
element.ResourceType.cpu_cores, instance, num_cores)
element.ResourceType.vcpus, instance, num_cores)
self.update_capacity(
element.ResourceType.disk, instance, disk_gb)
self.update_capacity(
@@ -83,13 +92,14 @@ class NovaNotification(base.NotificationEndpoint):
self.update_instance_mapping(instance, node)
def update_capacity(self, resource_id, obj, value):
resource = self.cluster_data_model.get_resource_by_uuid(resource_id)
resource.set_capacity(obj, value)
setattr(obj, resource_id.value, value)
def legacy_update_instance(self, instance, data):
instance.state = data['state']
instance.hostname = data['hostname']
instance.human_id = data['display_name']
instance.update({
'state': data['state'],
'hostname': data['hostname'],
'human_id': data['display_name'],
})
memory_mb = data['memory_mb']
num_cores = data['vcpus']
@@ -97,7 +107,7 @@ class NovaNotification(base.NotificationEndpoint):
self.update_capacity(element.ResourceType.memory, instance, memory_mb)
self.update_capacity(
element.ResourceType.cpu_cores, instance, num_cores)
element.ResourceType.vcpus, instance, num_cores)
self.update_capacity(
element.ResourceType.disk, instance, disk_gb)
self.update_capacity(
@@ -115,28 +125,34 @@ class NovaNotification(base.NotificationEndpoint):
def update_compute_node(self, node, data):
"""Update the compute node using the notification data."""
node_data = data['nova_object.data']
node.hostname = node_data['host']
node.state = (
node_state = (
element.ServiceState.OFFLINE.value
if node_data['forced_down'] else element.ServiceState.ONLINE.value)
node.status = (
node_status = (
element.ServiceState.DISABLED.value
if node_data['disabled'] else element.ServiceState.ENABLED.value)
node.update({
'hostname': node_data['host'],
'state': node_state,
'status': node_status,
})
def create_compute_node(self, node_hostname):
"""Update the compute node by querying the Nova API."""
try:
_node = self.nova.get_compute_node_by_hostname(node_hostname)
node = element.ComputeNode(_node.id)
node.uuid = node_hostname
node.hostname = _node.hypervisor_hostname
node.state = _node.state
node.status = _node.status
node = element.ComputeNode(
id=_node.id,
uuid=node_hostname,
hostname=_node.hypervisor_hostname,
state=_node.state,
status=_node.status)
self.update_capacity(
element.ResourceType.memory, node, _node.memory_mb)
self.update_capacity(
element.ResourceType.cpu_cores, node, _node.vcpus)
element.ResourceType.vcpus, node, _node.vcpus)
self.update_capacity(
element.ResourceType.disk, node, _node.free_disk_gb)
self.update_capacity(
@@ -170,18 +186,20 @@ class NovaNotification(base.NotificationEndpoint):
return
try:
try:
old_node = self.get_or_create_node(node.uuid)
current_node = (
self.cluster_data_model.get_node_by_instance_uuid(
instance.uuid) or self.get_or_create_node(node.uuid))
except exception.ComputeNodeNotFound as exc:
LOG.exception(exc)
# If we can't create the node,
# we consider the instance as unmapped
old_node = None
current_node = None
LOG.debug("Mapped node %s found", node.uuid)
if node and node != old_node:
if current_node and node != current_node:
LOG.debug("Unmapping instance %s from %s",
instance.uuid, node.uuid)
self.cluster_data_model.unmap_instance(instance, old_node)
self.cluster_data_model.unmap_instance(instance, current_node)
except exception.InstanceNotFound:
# The instance didn't exist yet so we map it for the first time
LOG.debug("New instance: mapping it to %s", node.uuid)
@@ -221,6 +239,7 @@ class ServiceUpdated(VersionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
node_data = payload['nova_object.data']
node_uuid = node_data['host']
try:
@@ -262,10 +281,12 @@ class InstanceCreated(VersionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
instance_data = payload['nova_object.data']
instance_uuid = instance_data['uuid']
instance = self.get_or_create_instance(instance_uuid)
node_uuid = instance_data.get('host')
instance = self.get_or_create_instance(instance_uuid, node_uuid)
self.update_instance(instance, payload)
@@ -294,9 +315,11 @@ class InstanceUpdated(VersionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
instance_data = payload['nova_object.data']
instance_uuid = instance_data['uuid']
instance = self.get_or_create_instance(instance_uuid)
node_uuid = instance_data.get('host')
instance = self.get_or_create_instance(instance_uuid, node_uuid)
self.update_instance(instance, payload)
@@ -317,10 +340,12 @@ class InstanceDeletedEnd(VersionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
instance_data = payload['nova_object.data']
instance_uuid = instance_data['uuid']
instance = self.get_or_create_instance(instance_uuid)
node_uuid = instance_data.get('host')
instance = self.get_or_create_instance(instance_uuid, node_uuid)
try:
node = self.get_or_create_node(instance_data['host'])
@@ -348,9 +373,11 @@ class LegacyInstanceUpdated(UnversionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
node_uuid = payload.get('node')
instance = self.get_or_create_instance(instance_uuid, node_uuid)
self.legacy_update_instance(instance, payload)
@@ -371,9 +398,11 @@ class LegacyInstanceCreatedEnd(UnversionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
node_uuid = payload.get('node')
instance = self.get_or_create_instance(instance_uuid, node_uuid)
self.legacy_update_instance(instance, payload)
@@ -394,8 +423,10 @@ class LegacyInstanceDeletedEnd(UnversionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
node_uuid = payload.get('node')
instance = self.get_or_create_instance(instance_uuid, node_uuid)
try:
node = self.get_or_create_node(payload['host'])
@@ -423,8 +454,10 @@ class LegacyLiveMigratedEnd(UnversionnedNotificationEndpoint):
dict(event=event_type,
publisher=publisher_id,
metadata=metadata))
LOG.debug(payload)
instance_uuid = payload['instance_id']
instance = self.get_or_create_instance(instance_uuid)
node_uuid = payload.get('node')
instance = self.get_or_create_instance(instance_uuid, node_uuid)
self.legacy_update_instance(instance, payload)