Cinder model integration
This patch adds Cinder model integration. Change-Id: I31d5bc5e2bbed885d074d66bf7999d42cec15f10 Implements: blueprint cinder-model-integration
This commit is contained in:
209
watcher/decision_engine/model/collector/cinder.py
Normal file
209
watcher/decision_engine/model/collector/cinder.py
Normal file
@@ -0,0 +1,209 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright 2017 NEC Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.common import cinder_helper
|
||||
from watcher.common import exception
|
||||
from watcher.decision_engine.model.collector import base
|
||||
from watcher.decision_engine.model import element
|
||||
from watcher.decision_engine.model import model_root
|
||||
from watcher.decision_engine.model.notification import cinder
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class CinderClusterDataModelCollector(base.BaseClusterDataModelCollector):
|
||||
"""Cinder cluster data model collector
|
||||
|
||||
The Cinder cluster data model collector creates an in-memory
|
||||
representation of the resources exposed by the storage service.
|
||||
"""
|
||||
|
||||
def __init__(self, config, osc=None):
|
||||
super(CinderClusterDataModelCollector, self).__init__(config, osc)
|
||||
|
||||
@property
|
||||
def notification_endpoints(self):
|
||||
"""Associated notification endpoints
|
||||
|
||||
:return: Associated notification endpoints
|
||||
:rtype: List of :py:class:`~.EventsNotificationEndpoint` instances
|
||||
"""
|
||||
return [
|
||||
cinder.CapacityNotificationEndpoint(self),
|
||||
cinder.VolumeCreateEnd(self),
|
||||
cinder.VolumeDeleteEnd(self),
|
||||
cinder.VolumeUpdateEnd(self),
|
||||
cinder.VolumeAttachEnd(self),
|
||||
cinder.VolumeDetachEnd(self),
|
||||
cinder.VolumeResizeEnd(self)
|
||||
]
|
||||
|
||||
def execute(self):
|
||||
"""Build the storage cluster data model"""
|
||||
LOG.debug("Building latest Cinder cluster data model")
|
||||
|
||||
builder = ModelBuilder(self.osc)
|
||||
return builder.execute()
|
||||
|
||||
|
||||
class ModelBuilder(object):
|
||||
"""Build the graph-based model
|
||||
|
||||
This model builder adds the following data"
|
||||
- Storage-related knowledge (Cinder)
|
||||
|
||||
"""
|
||||
def __init__(self, osc):
|
||||
self.osc = osc
|
||||
self.model = model_root.StorageModelRoot()
|
||||
self.cinder = osc.cinder()
|
||||
self.cinder_helper = cinder_helper.CinderHelper(osc=self.osc)
|
||||
|
||||
def _add_physical_layer(self):
|
||||
"""Add the physical layer of the graph.
|
||||
|
||||
This includes components which represent actual infrastructure
|
||||
hardware.
|
||||
"""
|
||||
for snode in self.cinder_helper.get_storage_node_list():
|
||||
self.add_storage_node(snode)
|
||||
for pool in self.cinder_helper.get_storage_pool_list():
|
||||
pool = self._build_storage_pool(pool)
|
||||
self.model.add_pool(pool)
|
||||
storage_name = getattr(pool, 'name')
|
||||
try:
|
||||
storage_node = self.model.get_node_by_name(
|
||||
storage_name)
|
||||
# Connect the instance to its compute node
|
||||
self.model.map_pool(pool, storage_node)
|
||||
except exception.StorageNodeNotFound:
|
||||
continue
|
||||
|
||||
def add_storage_node(self, node):
|
||||
# Build and add base node.
|
||||
storage_node = self.build_storage_node(node)
|
||||
self.model.add_node(storage_node)
|
||||
|
||||
def add_storage_pool(self, pool):
|
||||
storage_pool = self._build_storage_pool(pool)
|
||||
self.model.add_pool(storage_pool)
|
||||
|
||||
def build_storage_node(self, node):
|
||||
"""Build a storage node from a Cinder storage node
|
||||
|
||||
:param node: A storage node
|
||||
:type node: :py:class:`~cinderclient.v2.services.Service`
|
||||
"""
|
||||
# node.host is formatted as host@backendname since ocata,
|
||||
# or may be only host as of ocata
|
||||
backend = ""
|
||||
try:
|
||||
backend = node.host.split('@')[1]
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
volume_type = self.cinder_helper.get_volume_type_by_backendname(
|
||||
backend)
|
||||
|
||||
# build up the storage node.
|
||||
node_attributes = {
|
||||
"host": node.host,
|
||||
"zone": node.zone,
|
||||
"state": node.state,
|
||||
"status": node.status,
|
||||
"volume_type": volume_type}
|
||||
|
||||
storage_node = element.StorageNode(**node_attributes)
|
||||
return storage_node
|
||||
|
||||
def _build_storage_pool(self, pool):
|
||||
"""Build a storage pool from a Cinder storage pool
|
||||
|
||||
:param pool: A storage pool
|
||||
:type pool: :py:class:`~cinderlient.v2.capabilities.Capabilities`
|
||||
"""
|
||||
# build up the storage pool.
|
||||
node_attributes = {
|
||||
"name": pool.name,
|
||||
"total_volumes": pool.total_volumes,
|
||||
"total_capacity_gb": pool.total_capacity_gb,
|
||||
"free_capacity_gb": pool.free_capacity_gb,
|
||||
"provisioned_capacity_gb": pool.provisioned_capacity_gb,
|
||||
"allocated_capacity_gb": pool.allocated_capacity_gb}
|
||||
|
||||
storage_pool = element.Pool(**node_attributes)
|
||||
return storage_pool
|
||||
|
||||
def _add_virtual_layer(self):
|
||||
"""Add the virtual layer to the graph.
|
||||
|
||||
This layer is the virtual components of the infrastructure.
|
||||
"""
|
||||
self._add_virtual_storage()
|
||||
|
||||
def _add_virtual_storage(self):
|
||||
volumes = self.cinder_helper.get_volume_list()
|
||||
for vol in volumes:
|
||||
volume = self._build_volume_node(vol)
|
||||
self.model.add_volume(volume)
|
||||
pool_name = getattr(vol, 'os-vol-host-attr:host')
|
||||
if pool_name is None:
|
||||
# The volume is not attached to any pool
|
||||
continue
|
||||
try:
|
||||
pool = self.model.get_pool_by_pool_name(
|
||||
pool_name)
|
||||
self.model.map_volume(volume, pool)
|
||||
except exception.PoolNotFound:
|
||||
continue
|
||||
|
||||
def _build_volume_node(self, volume):
|
||||
"""Build an volume node
|
||||
|
||||
Create an volume node for the graph using cinder and the
|
||||
`volume` cinder object.
|
||||
:param instance: Cinder Volume object.
|
||||
:return: A volume node for the graph.
|
||||
"""
|
||||
attachments = [{k: v for k, v in six.iteritems(d) if k in (
|
||||
'server_id', 'attachment_id')} for d in volume.attachments]
|
||||
|
||||
volume_attributes = {
|
||||
"uuid": volume.id,
|
||||
"size": volume.size,
|
||||
"status": volume.status,
|
||||
"attachments": attachments,
|
||||
"name": volume.name or "",
|
||||
"multiattach": volume.multiattach,
|
||||
"snapshot_id": volume.snapshot_id or "",
|
||||
"project_id": getattr(volume, 'os-vol-tenant-attr:tenant_id'),
|
||||
"metadata": volume.metadata,
|
||||
"bootable": volume.bootable}
|
||||
|
||||
return element.Volume(**volume_attributes)
|
||||
|
||||
def execute(self):
|
||||
"""Instantiates the graph with the openstack cluster data.
|
||||
|
||||
The graph is populated along 2 layers: virtual and physical. As each
|
||||
new layer is built connections are made back to previous layers.
|
||||
"""
|
||||
self._add_physical_layer()
|
||||
self._add_virtual_layer()
|
||||
return self.model
|
||||
@@ -18,11 +18,23 @@
|
||||
|
||||
from watcher.decision_engine.model.element import instance
|
||||
from watcher.decision_engine.model.element import node
|
||||
from watcher.decision_engine.model.element import volume
|
||||
|
||||
ServiceState = node.ServiceState
|
||||
ComputeNode = node.ComputeNode
|
||||
StorageNode = node.StorageNode
|
||||
Pool = node.Pool
|
||||
|
||||
InstanceState = instance.InstanceState
|
||||
Instance = instance.Instance
|
||||
VolumeState = volume.VolumeState
|
||||
Volume = volume.Volume
|
||||
|
||||
__all__ = ['ServiceState', 'ComputeNode', 'InstanceState', 'Instance']
|
||||
__all__ = ['ServiceState',
|
||||
'ComputeNode',
|
||||
'InstanceState',
|
||||
'Instance',
|
||||
'StorageNode',
|
||||
'Pool',
|
||||
'VolumeState',
|
||||
'Volume']
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
import enum
|
||||
|
||||
from watcher.decision_engine.model.element import compute_resource
|
||||
from watcher.decision_engine.model.element import storage_resource
|
||||
from watcher.objects import base
|
||||
from watcher.objects import fields as wfields
|
||||
|
||||
@@ -45,3 +46,35 @@ class ComputeNode(compute_resource.ComputeResource):
|
||||
|
||||
def accept(self, visitor):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@base.WatcherObjectRegistry.register_if(False)
|
||||
class StorageNode(storage_resource.StorageResource):
|
||||
|
||||
fields = {
|
||||
"host": wfields.StringField(),
|
||||
"zone": wfields.StringField(),
|
||||
"status": wfields.StringField(default=ServiceState.ENABLED.value),
|
||||
"state": wfields.StringField(default=ServiceState.ONLINE.value),
|
||||
"volume_type": wfields.StringField()
|
||||
}
|
||||
|
||||
def accept(self, visitor):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@base.WatcherObjectRegistry.register_if(False)
|
||||
class Pool(storage_resource.StorageResource):
|
||||
|
||||
fields = {
|
||||
"name": wfields.StringField(),
|
||||
"total_volumes": wfields.NonNegativeIntegerField(),
|
||||
"total_capacity_gb": wfields.NonNegativeIntegerField(),
|
||||
"free_capacity_gb": wfields.NonNegativeIntegerField(),
|
||||
"provisioned_capacity_gb": wfields.NonNegativeIntegerField(),
|
||||
"allocated_capacity_gb": wfields.NonNegativeIntegerField(),
|
||||
"virtual_free": wfields.NonNegativeIntegerField(),
|
||||
}
|
||||
|
||||
def accept(self, visitor):
|
||||
raise NotImplementedError()
|
||||
|
||||
33
watcher/decision_engine/model/element/storage_resource.py
Normal file
33
watcher/decision_engine/model/element/storage_resource.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright 2017 NEC Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
from watcher.decision_engine.model.element import base
|
||||
from watcher.objects import fields as wfields
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class StorageResource(base.Element):
|
||||
|
||||
VERSION = '1.0'
|
||||
|
||||
fields = {
|
||||
"uuid": wfields.StringField(),
|
||||
"human_id": wfields.StringField(default=""),
|
||||
}
|
||||
56
watcher/decision_engine/model/element/volume.py
Normal file
56
watcher/decision_engine/model/element/volume.py
Normal file
@@ -0,0 +1,56 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright 2017 NEC Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import enum
|
||||
|
||||
from watcher.decision_engine.model.element import storage_resource
|
||||
from watcher.objects import base
|
||||
from watcher.objects import fields as wfields
|
||||
|
||||
|
||||
class VolumeState(enum.Enum):
|
||||
# https://developer.openstack.org/api-ref/block-storage/v3/#volumes-volumes
|
||||
|
||||
CREATING = 'creating'
|
||||
AVAILABLE = 'available'
|
||||
ATTACHING = 'attaching'
|
||||
IN_USE = 'in-use'
|
||||
DELETING = 'deleting'
|
||||
ERROR = 'error'
|
||||
ERROR_DELETING = 'error_deleting'
|
||||
BACKING_UP = 'backing-up'
|
||||
RESTORING_BACKUP = 'restoring-backup'
|
||||
ERROR_RESTORING = 'error_restoring'
|
||||
ERROR_EXTENDING = 'error_extending'
|
||||
|
||||
|
||||
@base.WatcherObjectRegistry.register_if(False)
|
||||
class Volume(storage_resource.StorageResource):
|
||||
|
||||
fields = {
|
||||
"size": wfields.NonNegativeIntegerField(),
|
||||
"status": wfields.StringField(default=VolumeState.AVAILABLE.value),
|
||||
"attachments": wfields.FlexibleListOfDictField(),
|
||||
"name": wfields.StringField(),
|
||||
"multiattach": wfields.BooleanField(),
|
||||
"snapshot_id": wfields.UUIDField(),
|
||||
"project_id": wfields.UUIDField(),
|
||||
"metadata": wfields.JsonField(),
|
||||
"bootable": wfields.BooleanField()
|
||||
}
|
||||
|
||||
def accept(self, visitor):
|
||||
raise NotImplementedError()
|
||||
@@ -242,3 +242,300 @@ class ModelRoot(nx.DiGraph, base.Model):
|
||||
return node1.as_dict() == node2.as_dict()
|
||||
return nx.algorithms.isomorphism.isomorph.is_isomorphic(
|
||||
G1, G2, node_match=node_match)
|
||||
|
||||
|
||||
class StorageModelRoot(nx.DiGraph, base.Model):
|
||||
"""Cluster graph for an Openstack cluster."""
|
||||
|
||||
def __init__(self, stale=False):
|
||||
super(StorageModelRoot, self).__init__()
|
||||
self.stale = stale
|
||||
|
||||
def __nonzero__(self):
|
||||
return not self.stale
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
@staticmethod
|
||||
def assert_node(obj):
|
||||
if not isinstance(obj, element.StorageNode):
|
||||
raise exception.IllegalArgumentException(
|
||||
message=_("'obj' argument type is not valid: %s") % type(obj))
|
||||
|
||||
@staticmethod
|
||||
def assert_pool(obj):
|
||||
if not isinstance(obj, element.Pool):
|
||||
raise exception.IllegalArgumentException(
|
||||
message=_("'obj' argument type is not valid: %s") % type(obj))
|
||||
|
||||
@staticmethod
|
||||
def assert_volume(obj):
|
||||
if not isinstance(obj, element.Volume):
|
||||
raise exception.IllegalArgumentException(
|
||||
message=_("'obj' argument type is not valid: %s") % type(obj))
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def add_node(self, node):
|
||||
self.assert_node(node)
|
||||
super(StorageModelRoot, self).add_node(node.host, node)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def add_pool(self, pool):
|
||||
self.assert_pool(pool)
|
||||
super(StorageModelRoot, self).add_node(pool.name, pool)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def remove_node(self, node):
|
||||
self.assert_node(node)
|
||||
try:
|
||||
super(StorageModelRoot, self).remove_node(node.host)
|
||||
except nx.NetworkXError as exc:
|
||||
LOG.exception(exc)
|
||||
raise exception.StorageNodeNotFound(name=node.host)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def remove_pool(self, pool):
|
||||
self.assert_pool(pool)
|
||||
try:
|
||||
super(StorageModelRoot, self).remove_node(pool.name)
|
||||
except nx.NetworkXError as exc:
|
||||
LOG.exception(exc)
|
||||
raise exception.PoolNotFound(name=pool.name)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def map_pool(self, pool, node):
|
||||
"""Map a newly created pool to a node
|
||||
|
||||
:param pool: :py:class:`~.Pool` object or pool name
|
||||
:param node: :py:class:`~.StorageNode` object or node host
|
||||
"""
|
||||
if isinstance(pool, six.string_types):
|
||||
pool = self.get_pool_by_pool_name(pool)
|
||||
if isinstance(node, six.string_types):
|
||||
node = self.get_node_by_name(node)
|
||||
self.assert_node(node)
|
||||
self.assert_pool(pool)
|
||||
|
||||
self.add_edge(pool.name, node.host)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def unmap_pool(self, pool, node):
|
||||
"""Unmap a pool from a node
|
||||
|
||||
:param pool: :py:class:`~.Pool` object or pool name
|
||||
:param node: :py:class:`~.StorageNode` object or node name
|
||||
"""
|
||||
if isinstance(pool, six.string_types):
|
||||
pool = self.get_pool_by_pool_name(pool)
|
||||
if isinstance(node, six.string_types):
|
||||
node = self.get_node_by_name(node)
|
||||
|
||||
self.remove_edge(pool.name, node.host)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def add_volume(self, volume):
|
||||
self.assert_volume(volume)
|
||||
super(StorageModelRoot, self).add_node(volume.uuid, volume)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def remove_volume(self, volume):
|
||||
self.assert_volume(volume)
|
||||
try:
|
||||
super(StorageModelRoot, self).remove_node(volume.uuid)
|
||||
except nx.NetworkXError as exc:
|
||||
LOG.exception(exc)
|
||||
raise exception.VolumeNotFound(name=volume.uuid)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def map_volume(self, volume, pool):
|
||||
"""Map a newly created volume to a pool
|
||||
|
||||
:param volume: :py:class:`~.Volume` object or volume UUID
|
||||
:param pool: :py:class:`~.Pool` object or pool name
|
||||
"""
|
||||
if isinstance(volume, six.string_types):
|
||||
volume = self.get_volume_by_uuid(volume)
|
||||
if isinstance(pool, six.string_types):
|
||||
pool = self.get_pool_by_pool_name(pool)
|
||||
self.assert_pool(pool)
|
||||
self.assert_volume(volume)
|
||||
|
||||
self.add_edge(volume.uuid, pool.name)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def unmap_volume(self, volume, pool):
|
||||
"""Unmap a volume from a pool
|
||||
|
||||
:param volume: :py:class:`~.Volume` object or volume UUID
|
||||
:param pool: :py:class:`~.Pool` object or pool name
|
||||
"""
|
||||
if isinstance(volume, six.string_types):
|
||||
volume = self.get_volume_by_uuid(volume)
|
||||
if isinstance(pool, six.string_types):
|
||||
pool = self.get_pool_by_pool_name(pool)
|
||||
|
||||
self.remove_edge(volume.uuid, pool.name)
|
||||
|
||||
def delete_volume(self, volume):
|
||||
self.assert_volume(volume)
|
||||
self.remove_volume(volume)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_all_storage_nodes(self):
|
||||
return {host: cn for host, cn in self.nodes(data=True)
|
||||
if isinstance(cn, element.StorageNode)}
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_node_by_name(self, name):
|
||||
"""Get a node by node name
|
||||
|
||||
:param node: :py:class:`~.StorageNode` object or node name
|
||||
"""
|
||||
try:
|
||||
return self._get_by_name(name.split("#")[0])
|
||||
except exception.StorageResourceNotFound:
|
||||
raise exception.StorageNodeNotFound(name=name)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_pool_by_pool_name(self, name):
|
||||
try:
|
||||
return self._get_by_name(name)
|
||||
except exception.StorageResourceNotFound:
|
||||
raise exception.PoolNotFound(name=name)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_volume_by_uuid(self, uuid):
|
||||
try:
|
||||
return self._get_by_uuid(uuid)
|
||||
except exception.StorageResourceNotFound:
|
||||
raise exception.VolumeNotFound(name=uuid)
|
||||
|
||||
def _get_by_uuid(self, uuid):
|
||||
try:
|
||||
return self.node[uuid]
|
||||
except Exception as exc:
|
||||
LOG.exception(exc)
|
||||
raise exception.StorageResourceNotFound(name=uuid)
|
||||
|
||||
def _get_by_name(self, name):
|
||||
try:
|
||||
return self.node[name]
|
||||
except Exception as exc:
|
||||
LOG.exception(exc)
|
||||
raise exception.StorageResourceNotFound(name=name)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_node_by_pool_name(self, pool_name):
|
||||
pool = self._get_by_name(pool_name)
|
||||
for node_name in self.neighbors(pool.name):
|
||||
node = self._get_by_name(node_name)
|
||||
if isinstance(node, element.StorageNode):
|
||||
return node
|
||||
raise exception.StorageNodeNotFound(name=pool_name)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_node_pools(self, node):
|
||||
self.assert_node(node)
|
||||
node_pools = []
|
||||
for pool_name in self.predecessors(node.host):
|
||||
pool = self._get_by_name(pool_name)
|
||||
if isinstance(pool, element.Pool):
|
||||
node_pools.append(pool)
|
||||
|
||||
return node_pools
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_pool_by_volume(self, volume):
|
||||
self.assert_volume(volume)
|
||||
volume = self._get_by_uuid(volume.uuid)
|
||||
for p in self.neighbors(volume.uuid):
|
||||
pool = self._get_by_name(p)
|
||||
if isinstance(pool, element.Pool):
|
||||
return pool
|
||||
raise exception.PoolNotFound(name=volume.uuid)
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_all_volumes(self):
|
||||
return {name: vol for name, vol in self.nodes(data=True)
|
||||
if isinstance(vol, element.Volume)}
|
||||
|
||||
@lockutils.synchronized("storage_model")
|
||||
def get_pool_volumes(self, pool):
|
||||
self.assert_pool(pool)
|
||||
volumes = []
|
||||
for vol in self.predecessors(pool.name):
|
||||
volume = self._get_by_uuid(vol)
|
||||
if isinstance(volume, element.Volume):
|
||||
volumes.append(volume)
|
||||
|
||||
return volumes
|
||||
|
||||
def to_string(self):
|
||||
return self.to_xml()
|
||||
|
||||
def to_xml(self):
|
||||
root = etree.Element("ModelRoot")
|
||||
# Build storage node tree
|
||||
for cn in sorted(self.get_all_storage_nodes().values(),
|
||||
key=lambda cn: cn.host):
|
||||
storage_node_el = cn.as_xml_element()
|
||||
# Build mapped pool tree
|
||||
node_pools = self.get_node_pools(cn)
|
||||
for pool in sorted(node_pools, key=lambda x: x.name):
|
||||
pool_el = pool.as_xml_element()
|
||||
storage_node_el.append(pool_el)
|
||||
# Build mapped volume tree
|
||||
pool_volumes = self.get_pool_volumes(pool)
|
||||
for volume in sorted(pool_volumes, key=lambda x: x.uuid):
|
||||
volume_el = volume.as_xml_element()
|
||||
pool_el.append(volume_el)
|
||||
|
||||
root.append(storage_node_el)
|
||||
|
||||
# Build unmapped volume tree (i.e. not assigned to any pool)
|
||||
for volume in sorted(self.get_all_volumes().values(),
|
||||
key=lambda vol: vol.uuid):
|
||||
try:
|
||||
self.get_pool_by_volume(volume)
|
||||
except (exception.VolumeNotFound, exception.PoolNotFound):
|
||||
root.append(volume.as_xml_element())
|
||||
|
||||
return etree.tostring(root, pretty_print=True).decode('utf-8')
|
||||
|
||||
@classmethod
|
||||
def from_xml(cls, data):
|
||||
model = cls()
|
||||
|
||||
root = etree.fromstring(data)
|
||||
for cn in root.findall('.//StorageNode'):
|
||||
node = element.StorageNode(**cn.attrib)
|
||||
model.add_node(node)
|
||||
|
||||
for p in root.findall('.//Pool'):
|
||||
pool = element.Pool(**p.attrib)
|
||||
model.add_pool(pool)
|
||||
|
||||
parent = p.getparent()
|
||||
if parent.tag == 'StorageNode':
|
||||
node = model.get_node_by_name(parent.get('host'))
|
||||
model.map_pool(pool, node)
|
||||
else:
|
||||
model.add_pool(pool)
|
||||
|
||||
for vol in root.findall('.//Volume'):
|
||||
volume = element.Volume(**vol.attrib)
|
||||
model.add_volume(volume)
|
||||
|
||||
parent = vol.getparent()
|
||||
if parent.tag == 'Pool':
|
||||
pool = model.get_pool_by_pool_name(parent.get('name'))
|
||||
model.map_volume(volume, pool)
|
||||
else:
|
||||
model.add_volume(volume)
|
||||
|
||||
return model
|
||||
|
||||
@classmethod
|
||||
def is_isomorphic(cls, G1, G2):
|
||||
return nx.algorithms.isomorphism.isomorph.is_isomorphic(
|
||||
G1, G2)
|
||||
|
||||
387
watcher/decision_engine/model/notification/cinder.py
Normal file
387
watcher/decision_engine/model/notification/cinder.py
Normal file
@@ -0,0 +1,387 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright 2017 NEC Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from oslo_log import log
|
||||
from watcher.common import cinder_helper
|
||||
from watcher.common import exception
|
||||
from watcher.decision_engine.model import element
|
||||
from watcher.decision_engine.model.notification import base
|
||||
from watcher.decision_engine.model.notification import filtering
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class CinderNotification(base.NotificationEndpoint):
|
||||
|
||||
def __init__(self, collector):
|
||||
super(CinderNotification, self).__init__(collector)
|
||||
self._cinder = None
|
||||
|
||||
@property
|
||||
def cinder(self):
|
||||
if self._cinder is None:
|
||||
self._cinder = cinder_helper.CinderHelper()
|
||||
return self._cinder
|
||||
|
||||
def update_pool(self, pool, data):
|
||||
"""Update the storage pool using the notification data."""
|
||||
pool.update({
|
||||
"total_capacity_gb": data['total'],
|
||||
"free_capacity_gb": data['free'],
|
||||
"provisioned_capacity_gb": data['provisioned'],
|
||||
"allocated_capacity_gb": data['allocated'],
|
||||
"virtual_free": data['virtual_free']
|
||||
})
|
||||
|
||||
node_name = pool.name.split("#")[0]
|
||||
node = self.get_or_create_node(node_name)
|
||||
self.cluster_data_model.map_pool(pool, node)
|
||||
LOG.debug("Mapped pool %s to %s", pool.name, node.host)
|
||||
|
||||
def update_pool_by_api(self, pool):
|
||||
"""Update the storage pool using the API data."""
|
||||
if not pool:
|
||||
return
|
||||
_pool = self.cinder.get_storage_pool_by_name(pool.name)
|
||||
pool.update({
|
||||
"total_volumes": _pool.total_volumes,
|
||||
"total_capacity_gb": _pool.total_capacity_gb,
|
||||
"free_capacity_gb": _pool.free_capacity_gb,
|
||||
"provisioned_capacity_gb": _pool.provisioned_capacity_gb,
|
||||
"allocated_capacity_gb": _pool.allocated_capacity_gb
|
||||
})
|
||||
node_name = pool.name.split("#")[0]
|
||||
node = self.get_or_create_node(node_name)
|
||||
self.cluster_data_model.map_pool(pool, node)
|
||||
LOG.debug("Mapped pool %s to %s", pool.name, node.host)
|
||||
|
||||
def create_storage_node(self, name):
|
||||
"""Create the storage node by querying the Cinder API."""
|
||||
try:
|
||||
_node = self.cinder.get_storage_node_by_name(name)
|
||||
_volume_type = self.cinder.get_volume_type_by_backendname(
|
||||
# name is formatted as host@backendname
|
||||
name.split('@')[1])
|
||||
storage_node = element.StorageNode(
|
||||
host=_node.host,
|
||||
zone=_node.zone,
|
||||
state=_node.state,
|
||||
status=_node.status,
|
||||
volume_type=_volume_type)
|
||||
return storage_node
|
||||
except Exception as exc:
|
||||
LOG.exception(exc)
|
||||
LOG.debug("Could not create storage node %s.", name)
|
||||
raise exception.StorageNodeNotFound(name=name)
|
||||
|
||||
def get_or_create_node(self, name):
|
||||
"""Get storage node by name, otherwise create storage node"""
|
||||
if name is None:
|
||||
LOG.debug("Storage node name not provided: skipping")
|
||||
return
|
||||
try:
|
||||
return self.cluster_data_model.get_node_by_name(name)
|
||||
except exception.StorageNodeNotFound:
|
||||
# The node didn't exist yet so we create a new node object
|
||||
node = self.create_storage_node(name)
|
||||
LOG.debug("New storage node created: %s", name)
|
||||
self.cluster_data_model.add_node(node)
|
||||
LOG.debug("New storage node added: %s", name)
|
||||
return node
|
||||
|
||||
def create_pool(self, pool_name):
|
||||
"""Create the storage pool by querying the Cinder API."""
|
||||
try:
|
||||
_pool = self.cinder.get_storage_pool_by_name(pool_name)
|
||||
pool = element.Pool(
|
||||
name=_pool.name,
|
||||
total_volumes=_pool.total_volumes,
|
||||
total_capacity_gb=_pool.total_capacity_gb,
|
||||
free_capacity_gb=_pool.free_capacity_gb,
|
||||
provisioned_capacity_gb=_pool.provisioned_capacity_gb,
|
||||
allocated_capacity_gb=_pool.allocated_capacity_gb)
|
||||
return pool
|
||||
except Exception as exc:
|
||||
LOG.exception(exc)
|
||||
LOG.debug("Could not refresh the pool %s.", pool_name)
|
||||
raise exception.PoolNotFound(name=pool_name)
|
||||
|
||||
def get_or_create_pool(self, name):
|
||||
if not name:
|
||||
LOG.debug("Pool name not provided: skipping")
|
||||
return
|
||||
try:
|
||||
return self.cluster_data_model.get_pool_by_pool_name(name)
|
||||
except exception.PoolNotFound:
|
||||
# The pool didn't exist yet so we create a new pool object
|
||||
pool = self.create_pool(name)
|
||||
LOG.debug("New storage pool created: %s", name)
|
||||
self.cluster_data_model.add_pool(pool)
|
||||
LOG.debug("New storage pool added: %s", name)
|
||||
return pool
|
||||
|
||||
def get_or_create_volume(self, volume_id, pool_name=None):
|
||||
try:
|
||||
if pool_name:
|
||||
self.get_or_create_pool(pool_name)
|
||||
except exception.PoolNotFound:
|
||||
LOG.warning("Could not find storage pool %(pool)s for "
|
||||
"volume %(volume)s",
|
||||
dict(pool=pool_name, volume=volume_id))
|
||||
try:
|
||||
return self.cluster_data_model.get_volume_by_uuid(volume_id)
|
||||
except exception.VolumeNotFound:
|
||||
# The volume didn't exist yet so we create a new volume object
|
||||
volume = element.Volume(uuid=volume_id)
|
||||
self.cluster_data_model.add_volume(volume)
|
||||
return volume
|
||||
|
||||
def update_volume(self, volume, data):
|
||||
"""Update the volume using the notification data."""
|
||||
|
||||
def _keyReplace(key):
|
||||
if key == 'instance_uuid':
|
||||
return 'server_id'
|
||||
if key == 'id':
|
||||
return 'attachment_id'
|
||||
|
||||
attachments = [
|
||||
{_keyReplace(k): v for k, v in six.iteritems(d)
|
||||
if k in ('instance_uuid', 'id')}
|
||||
for d in data['volume_attachment']
|
||||
]
|
||||
|
||||
# glance_metadata is provided if volume is bootable
|
||||
bootable = False
|
||||
if 'glance_metadata' in data:
|
||||
bootable = True
|
||||
|
||||
volume.update({
|
||||
"name": data['display_name'] or "",
|
||||
"size": data['size'],
|
||||
"status": data['status'],
|
||||
"attachments": attachments,
|
||||
"snapshot_id": data['snapshot_id'] or "",
|
||||
"project_id": data['tenant_id'],
|
||||
"metadata": data['metadata'],
|
||||
"bootable": bootable
|
||||
})
|
||||
|
||||
try:
|
||||
# if volume is under pool, let's update pool element.
|
||||
# get existing pool or create pool by cinder api
|
||||
pool = self.get_or_create_pool(data['host'])
|
||||
self.update_pool_by_api(pool)
|
||||
|
||||
except exception.PoolNotFound as exc:
|
||||
LOG.exception(exc)
|
||||
pool = None
|
||||
|
||||
self.update_volume_mapping(volume, pool)
|
||||
|
||||
def update_volume_mapping(self, volume, pool):
|
||||
if pool is None:
|
||||
self.cluster_data_model.add_volume(volume)
|
||||
LOG.debug("Volume %s not yet attached to any pool: skipping",
|
||||
volume.uuid)
|
||||
return
|
||||
try:
|
||||
try:
|
||||
current_pool = (
|
||||
self.cluster_data_model.get_pool_by_volume(
|
||||
volume) or self.get_or_create_pool(pool.name))
|
||||
except exception.PoolNotFound as exc:
|
||||
LOG.exception(exc)
|
||||
# If we can't create the pool,
|
||||
# we consider the volume as unmapped
|
||||
current_pool = None
|
||||
|
||||
LOG.debug("Mapped pool %s found", pool.name)
|
||||
if current_pool and pool != current_pool:
|
||||
LOG.debug("Unmapping volume %s from %s",
|
||||
volume.uuid, pool.name)
|
||||
self.cluster_data_model.unmap_volume(volume, current_pool)
|
||||
except exception.VolumeNotFound:
|
||||
# The instance didn't exist yet so we map it for the first time
|
||||
LOG.debug("New volume: mapping it to %s", pool.name)
|
||||
finally:
|
||||
if pool:
|
||||
self.cluster_data_model.map_volume(volume, pool)
|
||||
LOG.debug("Mapped volume %s to %s", volume.uuid, pool.name)
|
||||
|
||||
def delete_volume(self, volume, pool):
|
||||
try:
|
||||
self.cluster_data_model.delete_volume(volume)
|
||||
except Exception:
|
||||
LOG.info("Volume %s already deleted", volume.uuid)
|
||||
|
||||
try:
|
||||
if pool:
|
||||
# if volume is under pool, let's update pool element.
|
||||
# get existing pool or create pool by cinder api
|
||||
pool = self.get_or_create_pool(pool.name)
|
||||
self.update_pool_by_api(pool)
|
||||
except exception.PoolNotFound as exc:
|
||||
LOG.exception(exc)
|
||||
pool = None
|
||||
|
||||
|
||||
class CapacityNotificationEndpoint(CinderNotification):
|
||||
|
||||
@property
|
||||
def filter_rule(self):
|
||||
"""Cinder capacity notification filter"""
|
||||
return filtering.NotificationFilter(
|
||||
publisher_id=r'capacity.*',
|
||||
event_type='capacity.pool',
|
||||
)
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
ctxt.request_id = metadata['message_id']
|
||||
ctxt.project_domain = event_type
|
||||
LOG.info("Event '%(event)s' received from %(publisher)s "
|
||||
"with metadata %(metadata)s" %
|
||||
dict(event=event_type,
|
||||
publisher=publisher_id,
|
||||
metadata=metadata))
|
||||
LOG.debug(payload)
|
||||
name = payload['name_to_id']
|
||||
try:
|
||||
pool = self.get_or_create_pool(name)
|
||||
self.update_pool(pool, payload)
|
||||
except exception.PoolNotFound as exc:
|
||||
LOG.exception(exc)
|
||||
|
||||
|
||||
class VolumeNotificationEndpoint(CinderNotification):
|
||||
publisher_id_regex = r'^volume.*'
|
||||
|
||||
|
||||
class VolumeCreateEnd(VolumeNotificationEndpoint):
|
||||
|
||||
@property
|
||||
def filter_rule(self):
|
||||
"""Cinder volume notification filter"""
|
||||
return filtering.NotificationFilter(
|
||||
publisher_id=self.publisher_id_regex,
|
||||
event_type='volume.create.end',
|
||||
)
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
ctxt.request_id = metadata['message_id']
|
||||
ctxt.project_domain = event_type
|
||||
LOG.info("Event '%(event)s' received from %(publisher)s "
|
||||
"with metadata %(metadata)s" %
|
||||
dict(event=event_type,
|
||||
publisher=publisher_id,
|
||||
metadata=metadata))
|
||||
LOG.debug(payload)
|
||||
volume_id = payload['volume_id']
|
||||
poolname = payload['host']
|
||||
volume = self.get_or_create_volume(volume_id, poolname)
|
||||
self.update_volume(volume, payload)
|
||||
|
||||
|
||||
class VolumeUpdateEnd(VolumeNotificationEndpoint):
|
||||
|
||||
@property
|
||||
def filter_rule(self):
|
||||
"""Cinder volume notification filter"""
|
||||
return filtering.NotificationFilter(
|
||||
publisher_id=self.publisher_id_regex,
|
||||
event_type='volume.update.end',
|
||||
)
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
ctxt.request_id = metadata['message_id']
|
||||
ctxt.project_domain = event_type
|
||||
LOG.info("Event '%(event)s' received from %(publisher)s "
|
||||
"with metadata %(metadata)s" %
|
||||
dict(event=event_type,
|
||||
publisher=publisher_id,
|
||||
metadata=metadata))
|
||||
LOG.debug(payload)
|
||||
volume_id = payload['volume_id']
|
||||
poolname = payload['host']
|
||||
volume = self.get_or_create_volume(volume_id, poolname)
|
||||
self.update_volume(volume, payload)
|
||||
|
||||
|
||||
class VolumeAttachEnd(VolumeUpdateEnd):
|
||||
|
||||
@property
|
||||
def filter_rule(self):
|
||||
"""Cinder volume notification filter"""
|
||||
return filtering.NotificationFilter(
|
||||
publisher_id=self.publisher_id_regex,
|
||||
event_type='volume.attach.end',
|
||||
)
|
||||
|
||||
|
||||
class VolumeDetachEnd(VolumeUpdateEnd):
|
||||
|
||||
@property
|
||||
def filter_rule(self):
|
||||
"""Cinder volume notification filter"""
|
||||
return filtering.NotificationFilter(
|
||||
publisher_id=self.publisher_id_regex,
|
||||
event_type='volume.detach.end',
|
||||
)
|
||||
|
||||
|
||||
class VolumeResizeEnd(VolumeUpdateEnd):
|
||||
|
||||
@property
|
||||
def filter_rule(self):
|
||||
"""Cinder volume notification filter"""
|
||||
return filtering.NotificationFilter(
|
||||
publisher_id=self.publisher_id_regex,
|
||||
event_type='volume.resize.end',
|
||||
)
|
||||
|
||||
|
||||
class VolumeDeleteEnd(VolumeNotificationEndpoint):
|
||||
|
||||
@property
|
||||
def filter_rule(self):
|
||||
"""Cinder volume notification filter"""
|
||||
return filtering.NotificationFilter(
|
||||
publisher_id=self.publisher_id_regex,
|
||||
event_type='volume.delete.end',
|
||||
)
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
ctxt.request_id = metadata['message_id']
|
||||
ctxt.project_domain = event_type
|
||||
LOG.info("Event '%(event)s' received from %(publisher)s "
|
||||
"with metadata %(metadata)s" %
|
||||
dict(event=event_type,
|
||||
publisher=publisher_id,
|
||||
metadata=metadata))
|
||||
LOG.debug(payload)
|
||||
volume_id = payload['volume_id']
|
||||
poolname = payload['host']
|
||||
volume = self.get_or_create_volume(volume_id, poolname)
|
||||
|
||||
try:
|
||||
pool = self.get_or_create_pool(poolname)
|
||||
except exception.PoolNotFound as exc:
|
||||
LOG.exception(exc)
|
||||
pool = None
|
||||
|
||||
self.delete_volume(volume, pool)
|
||||
@@ -82,6 +82,7 @@ class BaseStrategy(loadable.Loadable):
|
||||
self._osc = osc
|
||||
self._collector_manager = None
|
||||
self._compute_model = None
|
||||
self._storage_model = None
|
||||
self._input_parameters = utils.Struct()
|
||||
self._audit_scope = None
|
||||
self._audit_scope_handler = None
|
||||
@@ -192,6 +193,27 @@ class BaseStrategy(loadable.Loadable):
|
||||
|
||||
return self._compute_model
|
||||
|
||||
@property
|
||||
def storage_model(self):
|
||||
"""Cluster data model
|
||||
|
||||
:returns: Cluster data model the strategy is executed on
|
||||
:rtype model: :py:class:`~.ModelRoot` instance
|
||||
"""
|
||||
if self._storage_model is None:
|
||||
collector = self.collector_manager.get_cluster_model_collector(
|
||||
'storage', osc=self.osc)
|
||||
self._storage_model = self.audit_scope_handler.get_scoped_model(
|
||||
collector.get_latest_cluster_data_model())
|
||||
|
||||
if not self._storage_model:
|
||||
raise exception.ClusterStateNotDefined()
|
||||
|
||||
if self._storage_model.stale:
|
||||
raise exception.ClusterStateStale()
|
||||
|
||||
return self._storage_model
|
||||
|
||||
@classmethod
|
||||
def get_schema(cls):
|
||||
"""Defines a Schema that the input parameters shall comply to
|
||||
|
||||
Reference in New Issue
Block a user