Refactored the compute model and its elements

In this changeset, I refactored the whole Watcher codebase to
adopt a naming convention about the various elements of the
Compute model so that it reflects the same naming convention
adopted by Nova.

Change-Id: I28adba5e1f27175f025330417b072686134d5f51
Partially-Implements: blueprint cluster-model-objects-wrapper
This commit is contained in:
Vincent Françoise
2016-07-06 17:44:29 +02:00
parent dbde1afea0
commit 31c37342cd
53 changed files with 1865 additions and 1803 deletions

View File

@@ -118,11 +118,11 @@ class ReleasedComputeNodesCount(IndicatorSpecification):
voluptuous.Range(min=0), required=True)
class VmMigrationsCount(IndicatorSpecification):
class InstanceMigrationsCount(IndicatorSpecification):
def __init__(self):
super(VmMigrationsCount, self).__init__(
name="vm_migrations_count",
description=_("The number of migrations to be performed."),
super(InstanceMigrationsCount, self).__init__(
name="instance_migrations_count",
description=_("The number of VM migrations to be performed."),
unit=None,
)

View File

@@ -34,14 +34,14 @@ class ServerConsolidation(base.EfficacySpecification):
def get_indicators_specifications(self):
return [
indicators.ReleasedComputeNodesCount(),
indicators.VmMigrationsCount(),
indicators.InstanceMigrationsCount(),
]
def get_global_efficacy_indicator(self, indicators_map):
value = 0
if indicators_map.vm_migrations_count > 0:
if indicators_map.instance_migrations_count > 0:
value = (float(indicators_map.released_compute_nodes_count) /
float(indicators_map.vm_migrations_count)) * 100
float(indicators_map.instance_migrations_count)) * 100
return efficacy.Indicator(
name="released_nodes_ratio",

View File

@@ -16,7 +16,6 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_config import cfg
@@ -40,7 +39,7 @@ class CollectorManager(object):
for collector_name in available_collectors:
collector = self.collector_loader.load(collector_name)
collectors[collector_name] = collector
self._collectors = collectors
self._collectors = collectors
return self._collectors

View File

@@ -20,10 +20,8 @@ from oslo_log import log
from watcher.common import nova_helper
from watcher.decision_engine.model.collector import base
from watcher.decision_engine.model import hypervisor as obj_hypervisor
from watcher.decision_engine.model import element
from watcher.decision_engine.model import model_root
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm as obj_vm
LOG = log.getLogger(__name__)
@@ -50,45 +48,46 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
LOG.debug("Building latest Nova cluster data model")
model = model_root.ModelRoot()
mem = resource.Resource(resource.ResourceType.memory)
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
disk_capacity = resource.Resource(resource.ResourceType.disk_capacity)
mem = element.Resource(element.ResourceType.memory)
num_cores = element.Resource(element.ResourceType.cpu_cores)
disk = element.Resource(element.ResourceType.disk)
disk_capacity = element.Resource(element.ResourceType.disk_capacity)
model.create_resource(mem)
model.create_resource(num_cores)
model.create_resource(disk)
model.create_resource(disk_capacity)
flavor_cache = {}
hypervisors = self.wrapper.get_hypervisors_list()
for h in hypervisors:
service = self.wrapper.nova.services.find(id=h.service['id'])
# create hypervisor in cluster_model_collector
hypervisor = obj_hypervisor.Hypervisor()
hypervisor.uuid = service.host
hypervisor.hostname = h.hypervisor_hostname
nodes = self.wrapper.get_compute_node_list()
for n in nodes:
service = self.wrapper.nova.services.find(id=n.service['id'])
# create node in cluster_model_collector
node = element.ComputeNode()
node.uuid = service.host
node.hostname = n.hypervisor_hostname
# set capacity
mem.set_capacity(hypervisor, h.memory_mb)
disk.set_capacity(hypervisor, h.free_disk_gb)
disk_capacity.set_capacity(hypervisor, h.local_gb)
num_cores.set_capacity(hypervisor, h.vcpus)
hypervisor.state = h.state
hypervisor.status = h.status
model.add_hypervisor(hypervisor)
vms = self.wrapper.get_vms_by_hypervisor(str(service.host))
for v in vms:
mem.set_capacity(node, n.memory_mb)
disk.set_capacity(node, n.free_disk_gb)
disk_capacity.set_capacity(node, n.local_gb)
num_cores.set_capacity(node, n.vcpus)
node.state = n.state
node.status = n.status
model.add_node(node)
instances = self.wrapper.get_instances_by_node(str(service.host))
for v in instances:
# create VM in cluster_model_collector
vm = obj_vm.VM()
vm.uuid = v.id
# nova/nova/compute/vm_states.py
vm.state = getattr(v, 'OS-EXT-STS:vm_state')
instance = element.Instance()
instance.uuid = v.id
# nova/nova/compute/instance_states.py
instance.state = getattr(v, 'OS-EXT-STS:instance_state')
# set capacity
self.wrapper.get_flavor_instance(v, flavor_cache)
mem.set_capacity(vm, v.flavor['ram'])
disk.set_capacity(vm, v.flavor['disk'])
num_cores.set_capacity(vm, v.flavor['vcpus'])
mem.set_capacity(instance, v.flavor['ram'])
disk.set_capacity(instance, v.flavor['disk'])
num_cores.set_capacity(instance, v.flavor['vcpus'])
model.get_mapping().map(node, instance)
model.add_instance(instance)
model.get_mapping().map(hypervisor, vm)
model.add_vm(vm)
return model

View File

@@ -0,0 +1,39 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Authors: Vincent FRANCOISE <vincent.francoise@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.model.element import disk_info
from watcher.decision_engine.model.element import instance
from watcher.decision_engine.model.element import node
from watcher.decision_engine.model.element import resource
ServiceState = node.ServiceState
PowerState = node.PowerState
ComputeNode = node.ComputeNode
InstanceState = instance.InstanceState
Instance = instance.Instance
DiskInfo = disk_info.DiskInfo
ResourceType = resource.ResourceType
Resource = resource.Resource
__all__ = [
'ServiceState', 'PowerState', 'ComputeNode', 'InstanceState', 'Instance',
'DiskInfo', 'ResourceType', 'Resource']

View File

@@ -1,11 +1,13 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
# Copyright (c) 2016 b<>com
#
# Authors: Vincent FRANCOISE <vincent.francoise@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,11 +16,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import abc
import six
class HypervisorState(enum.Enum):
ONLINE = 'up'
OFFLINE = 'down'
ENABLED = 'enabled'
DISABLED = 'disabled'
@six.add_metaclass(abc.ABCMeta)
class Element(object):
@abc.abstractmethod
def accept(self, visitor):
raise NotImplementedError()

View File

@@ -14,8 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
class ComputeResource(object):
import six
from watcher.decision_engine.model.element import base
@six.add_metaclass(abc.ABCMeta)
class ComputeResource(base.Element):
def __init__(self):
self._uuid = ""

View File

@@ -14,8 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.model.element import base
class DiskInfo(base.Element):
class DiskInfo(object):
def __init__(self):
self.name = ""
self.major = 0
@@ -23,6 +26,9 @@ class DiskInfo(object):
self.size = 0
self.scheduler = ""
def accept(self, visitor):
raise NotImplementedError()
def set_size(self, size):
"""DiskInfo

View File

@@ -0,0 +1,54 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
from watcher.decision_engine.model.element import compute_resource
class InstanceState(enum.Enum):
ACTIVE = 'active' # Instance is running
BUILDING = 'building' # Instance only exists in DB
PAUSED = 'paused'
SUSPENDED = 'suspended' # Instance is suspended to disk.
STOPPED = 'stopped' # Instance is shut off, the disk image is still there.
RESCUED = 'rescued' # A rescue image is running with the original image
# attached.
RESIZED = 'resized' # a Instance with the new size is active.
SOFT_DELETED = 'soft-delete'
# still available to restore.
DELETED = 'deleted' # Instance is permanently deleted.
ERROR = 'error'
class Instance(compute_resource.ComputeResource):
def __init__(self):
super(Instance, self).__init__()
self._state = InstanceState.ACTIVE.value
def accept(self, visitor):
raise NotImplementedError()
@property
def state(self):
return self._state
@state.setter
def state(self, state):
self._state = state

View File

@@ -14,17 +14,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.model import compute_resource
from watcher.decision_engine.model import hypervisor_state
from watcher.decision_engine.model import power_state
import enum
from watcher.decision_engine.model.element import compute_resource
class Hypervisor(compute_resource.ComputeResource):
class ServiceState(enum.Enum):
ONLINE = 'up'
OFFLINE = 'down'
ENABLED = 'enabled'
DISABLED = 'disabled'
class PowerState(enum.Enum):
# away mode
g0 = "g0"
# power on suspend (processor caches are flushed)
# The power to the CPU(s) and RAM is maintained
g1_S1 = "g1_S1"
# CPU powered off. Dirty cache is flushed to RAM
g1_S2 = "g1_S2"
# Suspend to RAM
g1_S3 = "g1_S3"
# Suspend to Disk
g1_S4 = "g1_S4"
# switch outlet X OFF on the PDU (Power Distribution Unit)
switch_off = "switch_off"
# switch outlet X ON on the PDU (Power Distribution Unit)
switch_on = "switch_on"
class ComputeNode(compute_resource.ComputeResource):
def __init__(self):
super(Hypervisor, self).__init__()
self._state = hypervisor_state.HypervisorState.ONLINE
self._status = hypervisor_state.HypervisorState.ENABLED
self._power_state = power_state.PowerState.g0
super(ComputeNode, self).__init__()
self._state = ServiceState.ONLINE
self._status = ServiceState.ENABLED
self._power_state = PowerState.g0
def accept(self, visitor):
raise NotImplementedError()
@property
def state(self):

View File

@@ -14,9 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log
import threading
from oslo_log import log
from watcher._i18n import _LW
LOG = log.getLogger(__name__)
@@ -25,110 +26,108 @@ LOG = log.getLogger(__name__)
class Mapping(object):
def __init__(self, model):
self.model = model
self._mapping_hypervisors = {}
self.mapping_vm = {}
self.compute_node_mapping = {}
self.instance_mapping = {}
self.lock = threading.Lock()
def map(self, hypervisor, vm):
"""Select the hypervisor where the instance is launched
def map(self, node, instance):
"""Select the node where the instance is launched
:param hypervisor: the hypervisor
:param vm: the virtual machine or instance
:param node: the node
:param instance: the virtual machine or instance
"""
try:
self.lock.acquire()
# init first
if hypervisor.uuid not in self._mapping_hypervisors.keys():
self._mapping_hypervisors[hypervisor.uuid] = []
if node.uuid not in self.compute_node_mapping.keys():
self.compute_node_mapping[node.uuid] = []
# map node => vms
self._mapping_hypervisors[hypervisor.uuid].append(
vm.uuid)
# map node => instances
self.compute_node_mapping[node.uuid].append(
instance.uuid)
# map vm => node
self.mapping_vm[vm.uuid] = hypervisor.uuid
# map instance => node
self.instance_mapping[instance.uuid] = node.uuid
finally:
self.lock.release()
def unmap(self, hypervisor, vm):
"""Remove the instance from the hypervisor
def unmap(self, node, instance):
"""Remove the instance from the node
:param hypervisor: the hypervisor
:param vm: the virtual machine or instance
:param node: the node
:param instance: the virtual machine or instance
"""
self.unmap_from_id(hypervisor.uuid, vm.uuid)
self.unmap_from_id(node.uuid, instance.uuid)
def unmap_from_id(self, node_uuid, vm_uuid):
"""Remove the instance (by id) from the hypervisor (by id)
def unmap_from_id(self, node_uuid, instance_uuid):
"""Remove the instance (by id) from the node (by id)
:rtype : object
"""
try:
self.lock.acquire()
if str(node_uuid) in self._mapping_hypervisors:
self._mapping_hypervisors[str(node_uuid)].remove(str(vm_uuid))
# remove vm
self.mapping_vm.pop(vm_uuid)
if str(node_uuid) in self.compute_node_mapping:
self.compute_node_mapping[str(node_uuid)].remove(
str(instance_uuid))
# remove instance
self.instance_mapping.pop(instance_uuid)
else:
LOG.warning(_LW(
"trying to delete the virtual machine %(vm)s but it was "
"not found on hypervisor %(hyp)s"),
{'vm': vm_uuid, 'hyp': node_uuid})
"Trying to delete the instance %(instance)s but it was "
"not found on node %(node)s"),
{'instance': instance_uuid, 'node': node_uuid})
finally:
self.lock.release()
def get_mapping(self):
return self._mapping_hypervisors
return self.compute_node_mapping
def get_mapping_vm(self):
return self.mapping_vm
def get_node_from_instance(self, instance):
return self.get_node_from_instance_id(instance.uuid)
def get_node_from_vm(self, vm):
return self.get_node_from_vm_id(vm.uuid)
def get_node_from_instance_id(self, instance_uuid):
"""Getting host information from the guest instance
def get_node_from_vm_id(self, vm_uuid):
"""Getting host information from the guest VM
:param vm: the uuid of the instance
:return: hypervisor
:param instance: the uuid of the instance
:return: node
"""
return self.model.get_hypervisor_from_id(
self.get_mapping_vm()[str(vm_uuid)])
return self.model.get_node_from_id(
self.instance_mapping[str(instance_uuid)])
def get_node_vms(self, hypervisor):
"""Get the list of instances running on the hypervisor
def get_node_instances(self, node):
"""Get the list of instances running on the node
:param hypervisor:
:param node:
:return:
"""
return self.get_node_vms_from_id(hypervisor.uuid)
return self.get_node_instances_from_id(node.uuid)
def get_node_vms_from_id(self, node_uuid):
if str(node_uuid) in self._mapping_hypervisors.keys():
return self._mapping_hypervisors[str(node_uuid)]
def get_node_instances_from_id(self, node_uuid):
if str(node_uuid) in self.compute_node_mapping.keys():
return self.compute_node_mapping[str(node_uuid)]
else:
# empty
return []
def migrate_vm(self, vm, src_hypervisor, dest_hypervisor):
"""Migrate single instance from src_hypervisor to dest_hypervisor
def migrate_instance(self, instance, source_node, destination_node):
"""Migrate single instance from source_node to destination_node
:param vm:
:param src_hypervisor:
:param dest_hypervisor:
:param instance:
:param source_node:
:param destination_node:
:return:
"""
if src_hypervisor == dest_hypervisor:
if source_node == destination_node:
return False
# unmap
self.unmap(src_hypervisor, vm)
self.unmap(source_node, instance)
# map
self.map(dest_hypervisor, vm)
self.map(destination_node, instance)
return True

View File

@@ -17,16 +17,14 @@
from watcher._i18n import _
from watcher.common import exception
from watcher.common import utils
from watcher.decision_engine.model import hypervisor
from watcher.decision_engine.model import element
from watcher.decision_engine.model import mapping
from watcher.decision_engine.model import vm
class ModelRoot(object):
def __init__(self, stale=False):
self._hypervisors = utils.Struct()
self._vms = utils.Struct()
self._nodes = utils.Struct()
self._instances = utils.Struct()
self.mapping = mapping.Mapping(self)
self.resource = utils.Struct()
self.stale = stale
@@ -36,46 +34,46 @@ class ModelRoot(object):
__bool__ = __nonzero__
def assert_hypervisor(self, obj):
if not isinstance(obj, hypervisor.Hypervisor):
def assert_node(self, obj):
if not isinstance(obj, element.ComputeNode):
raise exception.IllegalArgumentException(
message=_("'obj' argument type is not valid"))
def assert_vm(self, obj):
if not isinstance(obj, vm.VM):
def assert_instance(self, obj):
if not isinstance(obj, element.Instance):
raise exception.IllegalArgumentException(
message=_("'obj' argument type is not valid"))
def add_hypervisor(self, hypervisor):
self.assert_hypervisor(hypervisor)
self._hypervisors[hypervisor.uuid] = hypervisor
def add_node(self, node):
self.assert_node(node)
self._nodes[node.uuid] = node
def remove_hypervisor(self, hypervisor):
self.assert_hypervisor(hypervisor)
if str(hypervisor.uuid) not in self._hypervisors.keys():
raise exception.HypervisorNotFound(hypervisor.uuid)
def remove_node(self, node):
self.assert_node(node)
if str(node.uuid) not in self._nodes:
raise exception.ComputeNodeNotFound(node.uuid)
else:
del self._hypervisors[hypervisor.uuid]
del self._nodes[node.uuid]
def add_vm(self, vm):
self.assert_vm(vm)
self._vms[vm.uuid] = vm
def add_instance(self, instance):
self.assert_instance(instance)
self._instances[instance.uuid] = instance
def get_all_hypervisors(self):
return self._hypervisors
def get_all_compute_nodes(self):
return self._nodes
def get_hypervisor_from_id(self, hypervisor_uuid):
if str(hypervisor_uuid) not in self._hypervisors.keys():
raise exception.HypervisorNotFound(hypervisor_uuid)
return self._hypervisors[str(hypervisor_uuid)]
def get_node_from_id(self, node_uuid):
if str(node_uuid) not in self._nodes:
raise exception.ComputeNodeNotFound(node_uuid)
return self._nodes[str(node_uuid)]
def get_vm_from_id(self, uuid):
if str(uuid) not in self._vms.keys():
def get_instance_from_id(self, uuid):
if str(uuid) not in self._instances:
raise exception.InstanceNotFound(name=uuid)
return self._vms[str(uuid)]
return self._instances[str(uuid)]
def get_all_vms(self):
return self._vms
def get_all_instances(self):
return self._instances
def get_mapping(self):
return self.mapping
@@ -83,5 +81,5 @@ class ModelRoot(object):
def create_resource(self, r):
self.resource[str(r.name)] = r
def get_resource_from_id(self, id):
return self.resource[str(id)]
def get_resource_from_id(self, resource_id):
return self.resource[str(resource_id)]

View File

@@ -1,31 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
class PowerState(enum.Enum):
# away mode
g0 = "g0"
# power on suspend (processor caches are flushed)
# The power to the CPU(s) and RAM is maintained
g1_S1 = "g1_S1"
# CPU powered off. Dirty cache is flushed to RAM
g1_S2 = "g1_S2"
# Suspend to RAM
g1_S3 = "g1_S3"
# Suspend to Disk
g1_S4 = "g1_S4"
# switch outlet X OFF on the PDU (Power Distribution Unit)
switch_off = "switch_off"
# switch outlet X ON on the PDU (Power Distribution Unit)
switch_on = "switch_on"

View File

@@ -1,31 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.model import compute_resource
from watcher.decision_engine.model import vm_state
class VM(compute_resource.ComputeResource):
def __init__(self):
super(VM, self).__init__()
self._state = vm_state.VMState.ACTIVE.value
@property
def state(self):
return self._state
@state.setter
def state(self, state):
self._state = state

View File

@@ -1,34 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
class VMState(enum.Enum):
ACTIVE = 'active' # VM is running
BUILDING = 'building' # VM only exists in DB
PAUSED = 'paused'
SUSPENDED = 'suspended' # VM is suspended to disk.
STOPPED = 'stopped' # VM is powered off, the disk image is still there.
RESCUED = 'rescued' # A rescue image is running with the original VM image
# attached.
RESIZED = 'resized' # a VM with the new size is active.
SOFT_DELETED = 'soft-delete'
# still available to restore.
DELETED = 'deleted' # VM is permanently deleted.
ERROR = 'error'

View File

@@ -32,9 +32,7 @@ from oslo_log import log
from watcher._i18n import _, _LE, _LI, _LW
from watcher.common import exception
from watcher.decision_engine.cluster.history import ceilometer as cch
from watcher.decision_engine.model import hypervisor_state as hyper_state
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
@@ -136,46 +134,47 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
"""
self.migration_attempts = size_cluster * self.bound_migration
def check_migration(self, src_hypervisor, dest_hypervisor, vm_to_mig):
def check_migration(self, source_node, destination_node,
instance_to_migrate):
"""Check if the migration is possible
:param src_hypervisor: the current node of the virtual machine
:param dest_hypervisor: the destination of the virtual machine
:param vm_to_mig: the virtual machine
:param source_node: the current node of the virtual machine
:param destination_node: the destination of the virtual machine
:param instance_to_migrate: the instance / virtual machine
:return: True if the there is enough place otherwise false
"""
if src_hypervisor == dest_hypervisor:
if source_node == destination_node:
return False
LOG.debug('Migrate VM %s from %s to %s',
vm_to_mig, src_hypervisor, dest_hypervisor)
LOG.debug('Migrate instance %s from %s to %s',
instance_to_migrate, source_node, destination_node)
total_cores = 0
total_disk = 0
total_mem = 0
cpu_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores)
element.ResourceType.cpu_cores)
disk_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.disk)
element.ResourceType.disk)
memory_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.memory)
element.ResourceType.memory)
for vm_id in self.compute_model. \
get_mapping().get_node_vms(dest_hypervisor):
vm = self.compute_model.get_vm_from_id(vm_id)
total_cores += cpu_capacity.get_capacity(vm)
total_disk += disk_capacity.get_capacity(vm)
total_mem += memory_capacity.get_capacity(vm)
for instance_id in self.compute_model. \
get_mapping().get_node_instances(destination_node):
instance = self.compute_model.get_instance_from_id(instance_id)
total_cores += cpu_capacity.get_capacity(instance)
total_disk += disk_capacity.get_capacity(instance)
total_mem += memory_capacity.get_capacity(instance)
# capacity requested by hypervisor
total_cores += cpu_capacity.get_capacity(vm_to_mig)
total_disk += disk_capacity.get_capacity(vm_to_mig)
total_mem += memory_capacity.get_capacity(vm_to_mig)
# capacity requested by the compute node
total_cores += cpu_capacity.get_capacity(instance_to_migrate)
total_disk += disk_capacity.get_capacity(instance_to_migrate)
total_mem += memory_capacity.get_capacity(instance_to_migrate)
return self.check_threshold(dest_hypervisor, total_cores, total_disk,
return self.check_threshold(destination_node, total_cores, total_disk,
total_mem)
def check_threshold(self, dest_hypervisor, total_cores,
def check_threshold(self, destination_node, total_cores,
total_disk, total_mem):
"""Check threshold
@@ -183,18 +182,18 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
aggregated CPU capacity of VMs on one node to CPU capacity
of this node must not exceed the threshold value.
:param dest_hypervisor: the destination of the virtual machine
:param destination_node: the destination of the virtual machine
:param total_cores: total cores of the virtual machine
:param total_disk: total disk size used by the virtual machine
:param total_mem: total memory used by the virtual machine
:return: True if the threshold is not exceed
"""
cpu_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor)
element.ResourceType.cpu_cores).get_capacity(destination_node)
disk_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(dest_hypervisor)
element.ResourceType.disk).get_capacity(destination_node)
memory_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(dest_hypervisor)
element.ResourceType.memory).get_capacity(destination_node)
return (cpu_capacity >= total_cores * self.threshold_cores and
disk_capacity >= total_disk * self.threshold_disk and
@@ -210,7 +209,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
"""
return self.migration_attempts
def calculate_weight(self, element, total_cores_used, total_disk_used,
def calculate_weight(self, node, total_cores_used, total_disk_used,
total_memory_used):
"""Calculate weight of every resource
@@ -221,13 +220,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
:return:
"""
cpu_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(element)
element.ResourceType.cpu_cores).get_capacity(node)
disk_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(element)
element.ResourceType.disk).get_capacity(node)
memory_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(element)
element.ResourceType.memory).get_capacity(node)
score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
float(cpu_capacity))
@@ -245,13 +244,14 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
# TODO(jed): take in account weight
return (score_cores + score_disk + score_memory) / 3
def calculate_score_node(self, hypervisor):
def calculate_score_node(self, node):
"""Calculate the score that represent the utilization level
:param hypervisor:
:return:
:param node: :py:class:`~.ComputeNode` instance
:return: Score for the given compute node
:rtype: float
"""
resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname)
resource_id = "%s_%s" % (node.uuid, node.hostname)
host_avg_cpu_util = self.ceilometer. \
statistic_aggregation(resource_id=resource_id,
meter_name=self.HOST_CPU_USAGE_METRIC_NAME,
@@ -268,11 +268,11 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
host_avg_cpu_util = 100
cpu_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(hypervisor)
element.ResourceType.cpu_cores).get_capacity(node)
total_cores_used = cpu_capacity * (host_avg_cpu_util / 100)
return self.calculate_weight(hypervisor, total_cores_used, 0, 0)
return self.calculate_weight(node, total_cores_used, 0, 0)
def calculate_migration_efficacy(self):
"""Calculate migration efficacy
@@ -286,34 +286,34 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
else:
return 0
def calculate_score_vm(self, vm):
def calculate_score_instance(self, instance):
"""Calculate Score of virtual machine
:param vm: the virtual machine
:param instance: the virtual machine
:return: score
"""
vm_cpu_utilization = self.ceilometer. \
instance_cpu_utilization = self.ceilometer. \
statistic_aggregation(
resource_id=vm.uuid,
resource_id=instance.uuid,
meter_name=self.INSTANCE_CPU_USAGE_METRIC_NAME,
period="7200",
aggregate='avg'
)
if vm_cpu_utilization is None:
if instance_cpu_utilization is None:
LOG.error(
_LE("No values returned by %(resource_id)s "
"for %(metric_name)s"),
resource_id=vm.uuid,
resource_id=instance.uuid,
metric_name=self.INSTANCE_CPU_USAGE_METRIC_NAME,
)
vm_cpu_utilization = 100
instance_cpu_utilization = 100
cpu_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(vm)
element.ResourceType.cpu_cores).get_capacity(instance)
total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0)
total_cores_used = cpu_capacity * (instance_cpu_utilization / 100.0)
return self.calculate_weight(vm, total_cores_used, 0, 0)
return self.calculate_weight(instance, total_cores_used, 0, 0)
def add_change_service_state(self, resource_id, state):
parameters = {'state': state}
@@ -324,79 +324,80 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
def add_migration(self,
resource_id,
migration_type,
src_hypervisor,
dst_hypervisor):
source_node,
destination_node):
parameters = {'migration_type': migration_type,
'src_hypervisor': src_hypervisor,
'dst_hypervisor': dst_hypervisor}
'source_node': source_node,
'destination_node': destination_node}
self.solution.add_action(action_type=self.MIGRATION,
resource_id=resource_id,
input_parameters=parameters)
def score_of_nodes(self, score):
"""Calculate score of nodes based on load by VMs"""
for hypervisor_id in self.compute_model.get_all_hypervisors():
hypervisor = self.compute_model. \
get_hypervisor_from_id(hypervisor_id)
for node_id in self.compute_model.get_all_compute_nodes():
node = self.compute_model. \
get_node_from_id(node_id)
count = self.compute_model.get_mapping(). \
get_node_vms_from_id(hypervisor_id)
get_node_instances_from_id(node_id)
if len(count) > 0:
result = self.calculate_score_node(hypervisor)
result = self.calculate_score_node(node)
else:
# The hypervisor has not VMs
# The node has not VMs
result = 0
if len(count) > 0:
score.append((hypervisor_id, result))
score.append((node_id, result))
return score
def node_and_vm_score(self, sorted_score, score):
def node_and_instance_score(self, sorted_score, score):
"""Get List of VMs from node"""
node_to_release = sorted_score[len(score) - 1][0]
vms_to_mig = self.compute_model.get_mapping().get_node_vms_from_id(
node_to_release)
instances_to_migrate = (
self.compute_model.mapping.get_node_instances_from_id(
node_to_release))
vm_score = []
for vm_id in vms_to_mig:
vm = self.compute_model.get_vm_from_id(vm_id)
if vm.state == vm_state.VMState.ACTIVE.value:
vm_score.append(
(vm_id, self.calculate_score_vm(vm)))
instance_score = []
for instance_id in instances_to_migrate:
instance = self.compute_model.get_instance_from_id(instance_id)
if instance.state == element.InstanceState.ACTIVE.value:
instance_score.append(
(instance_id, self.calculate_score_instance(instance)))
return node_to_release, vm_score
return node_to_release, instance_score
def create_migration_vm(self, mig_vm, mig_src_hypervisor,
mig_dst_hypervisor):
def create_migration_instance(self, mig_instance, mig_source_node,
mig_destination_node):
"""Create migration VM"""
if self.compute_model.get_mapping().migrate_vm(
mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
self.add_migration(mig_vm.uuid, 'live',
mig_src_hypervisor.uuid,
mig_dst_hypervisor.uuid)
if self.compute_model.get_mapping().migrate_instance(
mig_instance, mig_source_node, mig_destination_node):
self.add_migration(mig_instance.uuid, 'live',
mig_source_node.uuid,
mig_destination_node.uuid)
if len(self.compute_model.get_mapping().get_node_vms(
mig_src_hypervisor)) == 0:
self.add_change_service_state(mig_src_hypervisor.
if len(self.compute_model.get_mapping().get_node_instances(
mig_source_node)) == 0:
self.add_change_service_state(mig_source_node.
uuid,
hyper_state.HypervisorState.
DISABLED.value)
element.ServiceState.DISABLED.value)
self.number_of_released_nodes += 1
def calculate_num_migrations(self, sorted_vms, node_to_release,
def calculate_num_migrations(self, sorted_instances, node_to_release,
sorted_score):
number_migrations = 0
for vm in sorted_vms:
for instance in sorted_instances:
for j in range(0, len(sorted_score)):
mig_vm = self.compute_model.get_vm_from_id(vm[0])
mig_src_hypervisor = self.compute_model.get_hypervisor_from_id(
mig_instance = self.compute_model.get_instance_from_id(
instance[0])
mig_source_node = self.compute_model.get_node_from_id(
node_to_release)
mig_dst_hypervisor = self.compute_model.get_hypervisor_from_id(
mig_destination_node = self.compute_model.get_node_from_id(
sorted_score[j][0])
result = self.check_migration(
mig_src_hypervisor, mig_dst_hypervisor, mig_vm)
mig_source_node, mig_destination_node, mig_instance)
if result:
self.create_migration_vm(
mig_vm, mig_src_hypervisor, mig_dst_hypervisor)
self.create_migration_instance(
mig_instance, mig_source_node, mig_destination_node)
number_migrations += 1
break
return number_migrations
@@ -420,22 +421,20 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
unsuccessful_migration = 0
first_migration = True
size_cluster = len(self.compute_model.get_all_hypervisors())
size_cluster = len(self.compute_model.get_all_compute_nodes())
if size_cluster == 0:
raise exception.ClusterEmpty()
self.compute_attempts(size_cluster)
for hypervisor_id in self.compute_model.get_all_hypervisors():
hypervisor = self.compute_model.get_hypervisor_from_id(
hypervisor_id)
for node_id in self.compute_model.get_all_compute_nodes():
node = self.compute_model.get_node_from_id(node_id)
count = self.compute_model.get_mapping(). \
get_node_vms_from_id(hypervisor_id)
get_node_instances_from_id(node_id)
if len(count) == 0:
if hypervisor.state == hyper_state.HypervisorState.ENABLED:
self.add_change_service_state(hypervisor_id,
hyper_state.HypervisorState.
DISABLED.value)
if node.state == element.ServiceState.ENABLED:
self.add_change_service_state(
node_id, element.ServiceState.DISABLED.value)
while self.get_allowed_migration_attempts() >= unsuccessful_migration:
if not first_migration:
@@ -449,7 +448,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
# Sort compute nodes by Score decreasing
sorted_score = sorted(score, reverse=True, key=lambda x: (x[1]))
LOG.debug("Hypervisor(s) BFD %s", sorted_score)
LOG.debug("Compute node(s) BFD %s", sorted_score)
# Get Node to be released
if len(score) == 0:
@@ -458,16 +457,17 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
" of the cluster is zero"))
break
node_to_release, vm_score = self.node_and_vm_score(
node_to_release, instance_score = self.node_and_instance_score(
sorted_score, score)
# Sort VMs by Score
sorted_vms = sorted(vm_score, reverse=True, key=lambda x: (x[1]))
# Sort instances by Score
sorted_instances = sorted(
instance_score, reverse=True, key=lambda x: (x[1]))
# BFD: Best Fit Decrease
LOG.debug("VM(s) BFD %s", sorted_vms)
LOG.debug("VM(s) BFD %s", sorted_instances)
migrations = self.calculate_num_migrations(
sorted_vms, node_to_release, sorted_score)
sorted_instances, node_to_release, sorted_score)
unsuccessful_migration = self.unsuccessful_migration_actualization(
migrations, unsuccessful_migration)
@@ -481,5 +481,5 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
def post_execute(self):
self.solution.set_efficacy_indicators(
released_compute_nodes_count=self.number_of_released_nodes,
vm_migrations_count=self.number_of_migrations,
instance_migrations_count=self.number_of_migrations,
)

View File

@@ -30,11 +30,10 @@ telemetries to measure thermal/workload status of server.
from oslo_log import log
from watcher._i18n import _, _LI, _LW
from watcher._i18n import _, _LW, _LI
from watcher.common import exception as wexc
from watcher.decision_engine.cluster.history import ceilometer as ceil
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
@@ -122,35 +121,35 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
def ceilometer(self, c):
self._ceilometer = c
def calc_used_res(self, hypervisor, cpu_capacity,
def calc_used_res(self, node, cpu_capacity,
memory_capacity, disk_capacity):
"""Calculate the used vcpus, memory and disk based on VM flavors"""
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
instances = self.compute_model.mapping.get_node_instances(node)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
if len(vms) > 0:
for vm_id in vms:
vm = self.compute_model.get_vm_from_id(vm_id)
vcpus_used += cpu_capacity.get_capacity(vm)
memory_mb_used += memory_capacity.get_capacity(vm)
disk_gb_used += disk_capacity.get_capacity(vm)
if len(instances) > 0:
for instance_id in instances:
instance = self.compute_model.get_instance_from_id(instance_id)
vcpus_used += cpu_capacity.get_capacity(instance)
memory_mb_used += memory_capacity.get_capacity(instance)
disk_gb_used += disk_capacity.get_capacity(instance)
return vcpus_used, memory_mb_used, disk_gb_used
def group_hosts_by_outlet_temp(self):
"""Group hosts based on outlet temp meters"""
hypervisors = self.compute_model.get_all_hypervisors()
size_cluster = len(hypervisors)
nodes = self.compute_model.get_all_compute_nodes()
size_cluster = len(nodes)
if size_cluster == 0:
raise wexc.ClusterEmpty()
hosts_need_release = []
hosts_target = []
for hypervisor_id in hypervisors:
hypervisor = self.compute_model.get_hypervisor_from_id(
hypervisor_id)
resource_id = hypervisor.uuid
for node_id in nodes:
node = self.compute_model.get_node_from_id(
node_id)
resource_id = node.uuid
outlet_temp = self.ceilometer.statistic_aggregation(
resource_id=resource_id,
@@ -163,53 +162,55 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
continue
LOG.debug("%s: outlet temperature %f" % (resource_id, outlet_temp))
hvmap = {'hv': hypervisor, 'outlet_temp': outlet_temp}
instance_data = {'node': node, 'outlet_temp': outlet_temp}
if outlet_temp >= self.threshold:
# mark the hypervisor to release resources
hosts_need_release.append(hvmap)
# mark the node to release resources
hosts_need_release.append(instance_data)
else:
hosts_target.append(hvmap)
hosts_target.append(instance_data)
return hosts_need_release, hosts_target
def choose_vm_to_migrate(self, hosts):
"""Pick up an active vm instance to migrate from provided hosts"""
for hvmap in hosts:
mig_src_hypervisor = hvmap['hv']
vms_of_src = self.compute_model.get_mapping().get_node_vms(
mig_src_hypervisor)
if len(vms_of_src) > 0:
for vm_id in vms_of_src:
def choose_instance_to_migrate(self, hosts):
"""Pick up an active instance to migrate from provided hosts"""
for instance_data in hosts:
mig_source_node = instance_data['node']
instances_of_src = self.compute_model.mapping.get_node_instances(
mig_source_node)
if len(instances_of_src) > 0:
for instance_id in instances_of_src:
try:
# select the first active VM to migrate
vm = self.compute_model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value:
LOG.info(_LI("VM not active, skipped: %s"),
vm.uuid)
# select the first active instance to migrate
instance = self.compute_model.get_instance_from_id(
instance_id)
if (instance.state !=
element.InstanceState.ACTIVE.value):
LOG.info(_LI("Instance not active, skipped: %s"),
instance.uuid)
continue
return mig_src_hypervisor, vm
return mig_source_node, instance
except wexc.InstanceNotFound as e:
LOG.exception(e)
LOG.info(_LI("VM not found"))
LOG.info(_LI("Instance not found"))
return None
def filter_dest_servers(self, hosts, vm_to_migrate):
def filter_dest_servers(self, hosts, instance_to_migrate):
"""Only return hosts with sufficient available resources"""
cpu_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores)
element.ResourceType.cpu_cores)
disk_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.disk)
element.ResourceType.disk)
memory_capacity = self.compute_model.get_resource_from_id(
resource.ResourceType.memory)
element.ResourceType.memory)
required_cores = cpu_capacity.get_capacity(vm_to_migrate)
required_disk = disk_capacity.get_capacity(vm_to_migrate)
required_memory = memory_capacity.get_capacity(vm_to_migrate)
required_cores = cpu_capacity.get_capacity(instance_to_migrate)
required_disk = disk_capacity.get_capacity(instance_to_migrate)
required_memory = memory_capacity.get_capacity(instance_to_migrate)
# filter hypervisors without enough resource
# filter nodes without enough resource
dest_servers = []
for hvmap in hosts:
host = hvmap['hv']
for instance_data in hosts:
host = instance_data['node']
# available
cores_used, mem_used, disk_used = self.calc_used_res(
host, cpu_capacity, memory_capacity, disk_capacity)
@@ -219,7 +220,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
if cores_available >= required_cores \
and disk_available >= required_disk \
and mem_available >= required_memory:
dest_servers.append(hvmap)
dest_servers.append(instance_data)
return dest_servers
@@ -251,13 +252,14 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
reverse=True,
key=lambda x: (x["outlet_temp"]))
vm_to_migrate = self.choose_vm_to_migrate(hosts_need_release)
# calculate the vm's cpu cores,memory,disk needs
if vm_to_migrate is None:
instance_to_migrate = self.choose_instance_to_migrate(
hosts_need_release)
# calculate the instance's cpu cores,memory,disk needs
if instance_to_migrate is None:
return self.solution
mig_src_hypervisor, vm_src = vm_to_migrate
dest_servers = self.filter_dest_servers(hosts_target, vm_src)
mig_source_node, instance_src = instance_to_migrate
dest_servers = self.filter_dest_servers(hosts_target, instance_src)
# sort the filtered result by outlet temp
# pick up the lowest one as dest server
if len(dest_servers) == 0:
@@ -268,15 +270,15 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
dest_servers = sorted(dest_servers, key=lambda x: (x["outlet_temp"]))
# always use the host with lowerest outlet temperature
mig_dst_hypervisor = dest_servers[0]['hv']
# generate solution to migrate the vm to the dest server,
if self.compute_model.get_mapping().migrate_vm(
vm_src, mig_src_hypervisor, mig_dst_hypervisor):
mig_destination_node = dest_servers[0]['node']
# generate solution to migrate the instance to the dest server,
if self.compute_model.mapping.migrate_instance(
instance_src, mig_source_node, mig_destination_node):
parameters = {'migration_type': 'live',
'src_hypervisor': mig_src_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid}
'source_node': mig_source_node.uuid,
'destination_node': mig_destination_node.uuid}
self.solution.add_action(action_type=self.MIGRATION,
resource_id=vm_src.uuid,
resource_id=instance_src.uuid,
input_parameters=parameters)
def post_execute(self):

View File

@@ -21,8 +21,7 @@ from oslo_log import log
from watcher._i18n import _, _LE, _LI, _LW
from watcher.common import exception as wexc
from watcher.decision_engine.cluster.history import ceilometer as ceil
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
@@ -121,20 +120,20 @@ class UniformAirflow(base.BaseStrategy):
return {
"properties": {
"threshold_airflow": {
"description": "airflow threshold for migration, Unit is\
0.1CFM",
"description": ("airflow threshold for migration, Unit is "
"0.1CFM"),
"type": "number",
"default": 400.0
},
"threshold_inlet_t": {
"description": "inlet temperature threshold for migration\
decision",
"description": ("inlet temperature threshold for "
"migration decision"),
"type": "number",
"default": 28.0
},
"threshold_power": {
"description": "system power threshold for migration\
decision",
"description": ("system power threshold for migration "
"decision"),
"type": "number",
"default": 350.0
},
@@ -146,112 +145,120 @@ class UniformAirflow(base.BaseStrategy):
},
}
def calculate_used_resource(self, hypervisor, cap_cores, cap_mem,
cap_disk):
"""Calculate the used vcpus, memory and disk based on VM flavors"""
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
def calculate_used_resource(self, node, cap_cores, cap_mem, cap_disk):
"""Compute the used vcpus, memory and disk based on instance flavors"""
instances = self.compute_model.mapping.get_node_instances(node)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
for vm_id in vms:
vm = self.compute_model.get_vm_from_id(vm_id)
vcpus_used += cap_cores.get_capacity(vm)
memory_mb_used += cap_mem.get_capacity(vm)
disk_gb_used += cap_disk.get_capacity(vm)
for instance_id in instances:
instance = self.compute_model.get_instance_from_id(
instance_id)
vcpus_used += cap_cores.get_capacity(instance)
memory_mb_used += cap_mem.get_capacity(instance)
disk_gb_used += cap_disk.get_capacity(instance)
return vcpus_used, memory_mb_used, disk_gb_used
def choose_vm_to_migrate(self, hosts):
"""Pick up an active vm instance to migrate from provided hosts
def choose_instance_to_migrate(self, hosts):
"""Pick up an active instance instance to migrate from provided hosts
:param hosts: the array of dict which contains hypervisor object
:param hosts: the array of dict which contains node object
"""
vms_tobe_migrate = []
for hvmap in hosts:
source_hypervisor = hvmap['hv']
source_vms = self.compute_model.get_mapping().get_node_vms(
source_hypervisor)
if source_vms:
instances_tobe_migrate = []
for nodemap in hosts:
source_node = nodemap['node']
source_instances = self.compute_model.mapping.get_node_instances(
source_node)
if source_instances:
inlet_t = self.ceilometer.statistic_aggregation(
resource_id=source_hypervisor.uuid,
resource_id=source_node.uuid,
meter_name=self.meter_name_inlet_t,
period=self._period,
aggregate='avg')
power = self.ceilometer.statistic_aggregation(
resource_id=source_hypervisor.uuid,
resource_id=source_node.uuid,
meter_name=self.meter_name_power,
period=self._period,
aggregate='avg')
if (power < self.threshold_power and
inlet_t < self.threshold_inlet_t):
# hardware issue, migrate all vms from this hypervisor
for vm_id in source_vms:
# hardware issue, migrate all instances from this node
for instance_id in source_instances:
try:
vm = self.compute_model.get_vm_from_id(vm_id)
vms_tobe_migrate.append(vm)
instance = (self.compute_model.
get_instance_from_id(instance_id))
instances_tobe_migrate.append(instance)
except wexc.InstanceNotFound:
LOG.error(_LE("VM not found; error: %s"), vm_id)
return source_hypervisor, vms_tobe_migrate
LOG.error(_LE("Instance not found; error: %s"),
instance_id)
return source_node, instances_tobe_migrate
else:
# migrate the first active vm
for vm_id in source_vms:
# migrate the first active instance
for instance_id in source_instances:
try:
vm = self.compute_model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value:
LOG.info(_LI("VM not active; skipped: %s"),
vm.uuid)
instance = (self.compute_model.
get_instance_from_id(instance_id))
if (instance.state !=
element.InstanceState.ACTIVE.value):
LOG.info(
_LI("Instance not active, skipped: %s"),
instance.uuid)
continue
vms_tobe_migrate.append(vm)
return source_hypervisor, vms_tobe_migrate
instances_tobe_migrate.append(instance)
return source_node, instances_tobe_migrate
except wexc.InstanceNotFound:
LOG.error(_LE("VM not found; error: %s"), vm_id)
LOG.error(_LE("Instance not found; error: %s"),
instance_id)
else:
LOG.info(_LI("VM not found on hypervisor: %s"),
source_hypervisor.uuid)
LOG.info(_LI("Instance not found on node: %s"),
source_node.uuid)
def filter_destination_hosts(self, hosts, vms_to_migrate):
"""Return vm and host with sufficient available resources"""
def filter_destination_hosts(self, hosts, instances_to_migrate):
"""Find instance and host with sufficient available resources"""
cap_cores = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores)
element.ResourceType.cpu_cores)
cap_disk = self.compute_model.get_resource_from_id(
resource.ResourceType.disk)
element.ResourceType.disk)
cap_mem = self.compute_model.get_resource_from_id(
resource.ResourceType.memory)
# large vm go first
vms_to_migrate = sorted(vms_to_migrate, reverse=True,
key=lambda x: (cap_cores.get_capacity(x)))
# find hosts for VMs
element.ResourceType.memory)
# large instance go first
instances_to_migrate = sorted(
instances_to_migrate, reverse=True,
key=lambda x: (cap_cores.get_capacity(x)))
# find hosts for instances
destination_hosts = []
for vm_to_migrate in vms_to_migrate:
required_cores = cap_cores.get_capacity(vm_to_migrate)
required_disk = cap_disk.get_capacity(vm_to_migrate)
required_mem = cap_mem.get_capacity(vm_to_migrate)
for instance_to_migrate in instances_to_migrate:
required_cores = cap_cores.get_capacity(instance_to_migrate)
required_disk = cap_disk.get_capacity(instance_to_migrate)
required_mem = cap_mem.get_capacity(instance_to_migrate)
dest_migrate_info = {}
for hvmap in hosts:
host = hvmap['hv']
if 'cores_used' not in hvmap:
for nodemap in hosts:
host = nodemap['node']
if 'cores_used' not in nodemap:
# calculate the available resources
hvmap['cores_used'], hvmap['mem_used'],\
hvmap['disk_used'] = self.calculate_used_resource(
nodemap['cores_used'], nodemap['mem_used'],\
nodemap['disk_used'] = self.calculate_used_resource(
host, cap_cores, cap_mem, cap_disk)
cores_available = (cap_cores.get_capacity(host) -
hvmap['cores_used'])
nodemap['cores_used'])
disk_available = (cap_disk.get_capacity(host) -
hvmap['disk_used'])
mem_available = cap_mem.get_capacity(host) - hvmap['mem_used']
nodemap['disk_used'])
mem_available = (
cap_mem.get_capacity(host) - nodemap['mem_used'])
if (cores_available >= required_cores and
disk_available >= required_disk and
mem_available >= required_mem):
dest_migrate_info['vm'] = vm_to_migrate
dest_migrate_info['hv'] = host
hvmap['cores_used'] += required_cores
hvmap['mem_used'] += required_mem
hvmap['disk_used'] += required_disk
dest_migrate_info['instance'] = instance_to_migrate
dest_migrate_info['node'] = host
nodemap['cores_used'] += required_cores
nodemap['mem_used'] += required_mem
nodemap['disk_used'] += required_disk
destination_hosts.append(dest_migrate_info)
break
# check if all vms have target hosts
if len(destination_hosts) != len(vms_to_migrate):
# check if all instances have target hosts
if len(destination_hosts) != len(instances_to_migrate):
LOG.warning(_LW("Not all target hosts could be found; it might "
"be because there is not enough resource"))
return None
@@ -260,15 +267,15 @@ class UniformAirflow(base.BaseStrategy):
def group_hosts_by_airflow(self):
"""Group hosts based on airflow meters"""
hypervisors = self.compute_model.get_all_hypervisors()
if not hypervisors:
nodes = self.compute_model.get_all_compute_nodes()
if not nodes:
raise wexc.ClusterEmpty()
overload_hosts = []
nonoverload_hosts = []
for hypervisor_id in hypervisors:
hypervisor = self.compute_model.get_hypervisor_from_id(
hypervisor_id)
resource_id = hypervisor.uuid
for node_id in nodes:
node = self.compute_model.get_node_from_id(
node_id)
resource_id = node.uuid
airflow = self.ceilometer.statistic_aggregation(
resource_id=resource_id,
meter_name=self.meter_name_airflow,
@@ -280,12 +287,12 @@ class UniformAirflow(base.BaseStrategy):
continue
LOG.debug("%s: airflow %f" % (resource_id, airflow))
hvmap = {'hv': hypervisor, 'airflow': airflow}
nodemap = {'node': node, 'airflow': airflow}
if airflow >= self.threshold_airflow:
# mark the hypervisor to release resources
overload_hosts.append(hvmap)
# mark the node to release resources
overload_hosts.append(nodemap)
else:
nonoverload_hosts.append(hvmap)
nonoverload_hosts.append(nodemap)
return overload_hosts, nonoverload_hosts
def pre_execute(self):
@@ -299,49 +306,48 @@ class UniformAirflow(base.BaseStrategy):
self.threshold_inlet_t = self.input_parameters.threshold_inlet_t
self.threshold_power = self.input_parameters.threshold_power
self._period = self.input_parameters.period
src_hypervisors, target_hypervisors = (
self.group_hosts_by_airflow())
source_nodes, target_nodes = self.group_hosts_by_airflow()
if not src_hypervisors:
if not source_nodes:
LOG.debug("No hosts require optimization")
return self.solution
if not target_hypervisors:
if not target_nodes:
LOG.warning(_LW("No hosts currently have airflow under %s, "
"therefore there are no possible target "
"hosts for any migration"),
self.threshold_airflow)
return self.solution
# migrate the vm from server with largest airflow first
src_hypervisors = sorted(src_hypervisors,
reverse=True,
key=lambda x: (x["airflow"]))
vms_to_migrate = self.choose_vm_to_migrate(src_hypervisors)
if not vms_to_migrate:
# migrate the instance from server with largest airflow first
source_nodes = sorted(source_nodes,
reverse=True,
key=lambda x: (x["airflow"]))
instances_to_migrate = self.choose_instance_to_migrate(source_nodes)
if not instances_to_migrate:
return self.solution
source_hypervisor, vms_src = vms_to_migrate
source_node, instances_src = instances_to_migrate
# sort host with airflow
target_hypervisors = sorted(target_hypervisors,
key=lambda x: (x["airflow"]))
# find the hosts that have enough resource for the VM to be migrated
destination_hosts = self.filter_destination_hosts(target_hypervisors,
vms_src)
target_nodes = sorted(target_nodes, key=lambda x: (x["airflow"]))
# find the hosts that have enough resource
# for the instance to be migrated
destination_hosts = self.filter_destination_hosts(
target_nodes, instances_src)
if not destination_hosts:
LOG.warning(_LW("No target host could be found; it might "
"be because there is not enough resources"))
return self.solution
# generate solution to migrate the vm to the dest server,
# generate solution to migrate the instance to the dest server,
for info in destination_hosts:
vm_src = info['vm']
mig_dst_hypervisor = info['hv']
if self.compute_model.get_mapping().migrate_vm(
vm_src, source_hypervisor, mig_dst_hypervisor):
instance = info['instance']
destination_node = info['node']
if self.compute_model.mapping.migrate_instance(
instance, source_node, destination_node):
parameters = {'migration_type': 'live',
'src_hypervisor': source_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid}
'source_node': source_node.uuid,
'destination_node': destination_node.uuid}
self.solution.add_action(action_type=self.MIGRATION,
resource_id=vm_src.uuid,
resource_id=instance.uuid,
input_parameters=parameters)
def post_execute(self):

View File

@@ -24,9 +24,7 @@ from watcher._i18n import _, _LE, _LI
from watcher.common import exception
from watcher.decision_engine.cluster.history import ceilometer \
as ceilometer_cluster_history
from watcher.decision_engine.model import hypervisor_state as hyper_state
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
@@ -48,26 +46,26 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
* Offload phase - handling over-utilized resources
* Consolidation phase - handling under-utilized resources
* Solution optimization - reducing number of migrations
* Disability of unused hypervisors
* Disability of unused compute nodes
A capacity coefficients (cc) might be used to adjust optimization
thresholds. Different resources may require different coefficient
values as well as setting up different coefficient values in both
phases may lead to more efficient consolidation in the end.
phases may lead to to more efficient consolidation in the end.
If the cc equals 1 the full resource capacity may be used, cc
values lower than 1 will lead to resource under utilization and
values higher than 1 will lead to resource overbooking.
e.g. If targeted utilization is 80 percent of hypervisor capacity,
e.g. If targeted utilization is 80 percent of a compute node capacity,
the coefficient in the consolidation phase will be 0.8, but
may any lower value in the offloading phase. The lower it gets
the cluster will appear more released (distributed) for the
following consolidation phase.
As this strategy leverages VM live migration to move the load
from one hypervisor to another, this feature needs to be set up
correctly on all hypervisors within the cluster.
As this strategy laverages VM live migration to move the load
from one compute node to another, this feature needs to be set up
correctly on all compute nodes within the cluster.
This strategy assumes it is possible to live migrate any VM from
an active hypervisor to any other active hypervisor.
an active compute node to any other active compute node.
*Requirements*
@@ -86,8 +84,8 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
super(VMWorkloadConsolidation, self).__init__(config, osc)
self._ceilometer = None
self.number_of_migrations = 0
self.number_of_released_hypervisors = 0
self.ceilometer_vm_data_cache = dict()
self.number_of_released_nodes = 0
self.ceilometer_instance_data_cache = dict()
@classmethod
def get_name(cls):
@@ -119,200 +117,203 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
"""
if isinstance(state, six.string_types):
return state
elif isinstance(state, (vm_state.VMState,
hyper_state.HypervisorState)):
elif isinstance(state, (element.InstanceState, element.ServiceState)):
return state.value
else:
LOG.error(_LE('Unexpected resource state type, '
LOG.error(_LE('Unexpexted resource state type, '
'state=%(state)s, state_type=%(st)s.'),
state=state,
st=type(state))
raise exception.WatcherException
def add_action_enable_hypervisor(self, hypervisor):
"""Add an action for hypervisor enabler into the solution.
def add_action_enable_compute_node(self, node):
"""Add an action for node enabler into the solution.
:param hypervisor: hypervisor object
:param node: node object
:return: None
"""
params = {'state': hyper_state.HypervisorState.ENABLED.value}
params = {'state': element.ServiceState.ENABLED.value}
self.solution.add_action(
action_type='change_nova_service_state',
resource_id=hypervisor.uuid,
resource_id=node.uuid,
input_parameters=params)
self.number_of_released_hypervisors -= 1
self.number_of_released_nodes -= 1
def add_action_disable_hypervisor(self, hypervisor):
"""Add an action for hypervisor disablity into the solution.
def add_action_disable_node(self, node):
"""Add an action for node disablity into the solution.
:param hypervisor: hypervisor object
:param node: node object
:return: None
"""
params = {'state': hyper_state.HypervisorState.DISABLED.value}
params = {'state': element.ServiceState.DISABLED.value}
self.solution.add_action(
action_type='change_nova_service_state',
resource_id=hypervisor.uuid,
resource_id=node.uuid,
input_parameters=params)
self.number_of_released_hypervisors += 1
self.number_of_released_nodes += 1
def add_migration(self, vm_uuid, src_hypervisor,
dst_hypervisor, model):
def add_migration(self, instance_uuid, source_node,
destination_node, model):
"""Add an action for VM migration into the solution.
:param vm_uuid: vm uuid
:param src_hypervisor: hypervisor object
:param dst_hypervisor: hypervisor object
:param instance_uuid: instance uuid
:param source_node: node object
:param destination_node: node object
:param model: model_root object
:return: None
"""
vm = model.get_vm_from_id(vm_uuid)
instance = model.get_instance_from_id(instance_uuid)
vm_state_str = self.get_state_str(vm.state)
if vm_state_str != vm_state.VMState.ACTIVE.value:
instance_state_str = self.get_state_str(instance.state)
if instance_state_str != element.InstanceState.ACTIVE.value:
# Watcher curently only supports live VM migration and block live
# VM migration which both requires migrated VM to be active.
# When supported, the cold migration may be used as a fallback
# migration mechanism to move non active VMs.
LOG.error(_LE('Cannot live migrate: vm_uuid=%(vm_uuid)s, '
'state=%(vm_state)s.'),
vm_uuid=vm_uuid,
vm_state=vm_state_str)
LOG.error(
_LE('Cannot live migrate: instance_uuid=%(instance_uuid)s, '
'state=%(instance_state)s.'),
instance_uuid=instance_uuid,
instance_state=instance_state_str)
raise exception.WatcherException
migration_type = 'live'
dst_hyper_state_str = self.get_state_str(dst_hypervisor.state)
if dst_hyper_state_str == hyper_state.HypervisorState.DISABLED.value:
self.add_action_enable_hypervisor(dst_hypervisor)
model.get_mapping().unmap(src_hypervisor, vm)
model.get_mapping().map(dst_hypervisor, vm)
destination_node_state_str = self.get_state_str(destination_node.state)
if destination_node_state_str == element.ServiceState.DISABLED.value:
self.add_action_enable_compute_node(destination_node)
model.mapping.unmap(source_node, instance)
model.mapping.map(destination_node, instance)
params = {'migration_type': migration_type,
'src_hypervisor': src_hypervisor.uuid,
'dst_hypervisor': dst_hypervisor.uuid}
'source_node': source_node.uuid,
'destination_node': destination_node.uuid}
self.solution.add_action(action_type='migrate',
resource_id=vm.uuid,
resource_id=instance.uuid,
input_parameters=params)
self.number_of_migrations += 1
def disable_unused_hypervisors(self, model):
"""Generate actions for disablity of unused hypervisors.
def disable_unused_nodes(self, model):
"""Generate actions for disablity of unused nodes.
:param model: model_root object
:return: None
"""
for hypervisor in model.get_all_hypervisors().values():
if (len(model.get_mapping().get_node_vms(hypervisor)) == 0 and
hypervisor.status !=
hyper_state.HypervisorState.DISABLED.value):
self.add_action_disable_hypervisor(hypervisor)
for node in model.get_all_compute_nodes().values():
if (len(model.mapping.get_node_instances(node)) == 0 and
node.status !=
element.ServiceState.DISABLED.value):
self.add_action_disable_node(node)
def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'):
def get_instance_utilization(self, instance_uuid, model,
period=3600, aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a VM.
:param vm_uuid: vm object
:param instance_uuid: instance object
:param model: model_root object
:param period: seconds
:param aggr: string
:return: dict(cpu(number of vcpus used), ram(MB used), disk(B used))
"""
if vm_uuid in self.ceilometer_vm_data_cache.keys():
return self.ceilometer_vm_data_cache.get(vm_uuid)
if instance_uuid in self.ceilometer_instance_data_cache.keys():
return self.ceilometer_instance_data_cache.get(instance_uuid)
cpu_util_metric = 'cpu_util'
ram_util_metric = 'memory.usage'
ram_alloc_metric = 'memory'
disk_alloc_metric = 'disk.root.size'
vm_cpu_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=cpu_util_metric,
instance_cpu_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=cpu_util_metric,
period=period, aggregate=aggr)
vm_cpu_cores = model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(
model.get_vm_from_id(vm_uuid))
instance_cpu_cores = model.get_resource_from_id(
element.ResourceType.cpu_cores).get_capacity(
model.get_instance_from_id(instance_uuid))
if vm_cpu_util:
total_cpu_utilization = vm_cpu_cores * (vm_cpu_util / 100.0)
if instance_cpu_util:
total_cpu_utilization = (
instance_cpu_cores * (instance_cpu_util / 100.0))
else:
total_cpu_utilization = vm_cpu_cores
total_cpu_utilization = instance_cpu_cores
vm_ram_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=ram_util_metric,
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=ram_util_metric,
period=period, aggregate=aggr)
if not vm_ram_util:
vm_ram_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=ram_alloc_metric,
if not instance_ram_util:
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=ram_alloc_metric,
period=period, aggregate=aggr)
vm_disk_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=disk_alloc_metric,
instance_disk_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=disk_alloc_metric,
period=period, aggregate=aggr)
if not vm_ram_util or not vm_disk_util:
if not instance_ram_util or not instance_disk_util:
LOG.error(
_LE('No values returned by %(resource_id)s '
'for memory.usage or disk.root.size'),
resource_id=vm_uuid
resource_id=instance_uuid
)
raise exception.NoDataFound
self.ceilometer_vm_data_cache[vm_uuid] = dict(
cpu=total_cpu_utilization, ram=vm_ram_util, disk=vm_disk_util)
return self.ceilometer_vm_data_cache.get(vm_uuid)
self.ceilometer_instance_data_cache[instance_uuid] = dict(
cpu=total_cpu_utilization, ram=instance_ram_util,
disk=instance_disk_util)
return self.ceilometer_instance_data_cache.get(instance_uuid)
def get_hypervisor_utilization(self, hypervisor, model, period=3600,
aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a hypervisor.
def get_node_utilization(self, node, model, period=3600, aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a node.
:param hypervisor: hypervisor object
:param node: node object
:param model: model_root object
:param period: seconds
:param aggr: string
:return: dict(cpu(number of cores used), ram(MB used), disk(B used))
"""
hypervisor_vms = model.get_mapping().get_node_vms_from_id(
hypervisor.uuid)
hypervisor_ram_util = 0
hypervisor_disk_util = 0
hypervisor_cpu_util = 0
for vm_uuid in hypervisor_vms:
vm_util = self.get_vm_utilization(vm_uuid, model, period, aggr)
hypervisor_cpu_util += vm_util['cpu']
hypervisor_ram_util += vm_util['ram']
hypervisor_disk_util += vm_util['disk']
node_instances = model.mapping.get_node_instances_from_id(
node.uuid)
node_ram_util = 0
node_disk_util = 0
node_cpu_util = 0
for instance_uuid in node_instances:
instance_util = self.get_instance_utilization(
instance_uuid, model, period, aggr)
node_cpu_util += instance_util['cpu']
node_ram_util += instance_util['ram']
node_disk_util += instance_util['disk']
return dict(cpu=hypervisor_cpu_util, ram=hypervisor_ram_util,
disk=hypervisor_disk_util)
return dict(cpu=node_cpu_util, ram=node_ram_util,
disk=node_disk_util)
def get_hypervisor_capacity(self, hypervisor, model):
"""Collect cpu, ram and disk capacity of a hypervisor.
def get_node_capacity(self, node, model):
"""Collect cpu, ram and disk capacity of a node.
:param hypervisor: hypervisor object
:param node: node object
:param model: model_root object
:return: dict(cpu(cores), ram(MB), disk(B))
"""
hypervisor_cpu_capacity = model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(hypervisor)
node_cpu_capacity = model.get_resource_from_id(
element.ResourceType.cpu_cores).get_capacity(node)
hypervisor_disk_capacity = model.get_resource_from_id(
resource.ResourceType.disk_capacity).get_capacity(hypervisor)
node_disk_capacity = model.get_resource_from_id(
element.ResourceType.disk_capacity).get_capacity(node)
hypervisor_ram_capacity = model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(hypervisor)
return dict(cpu=hypervisor_cpu_capacity, ram=hypervisor_ram_capacity,
disk=hypervisor_disk_capacity)
node_ram_capacity = model.get_resource_from_id(
element.ResourceType.memory).get_capacity(node)
return dict(cpu=node_cpu_capacity, ram=node_ram_capacity,
disk=node_disk_capacity)
def get_relative_hypervisor_utilization(self, hypervisor, model):
"""Return relative hypervisor utilization (rhu).
def get_relative_node_utilization(self, node, model):
"""Return relative node utilization (rhu).
:param hypervisor: hypervisor object
:param node: node object
:param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
"""
rhu = {}
util = self.get_hypervisor_utilization(hypervisor, model)
cap = self.get_hypervisor_capacity(hypervisor, model)
util = self.get_node_utilization(node, model)
cap = self.get_node_capacity(node, model)
for k in util.keys():
rhu[k] = float(util[k]) / float(cap[k])
return rhu
@@ -320,18 +321,18 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
def get_relative_cluster_utilization(self, model):
"""Calculate relative cluster utilization (rcu).
RCU is an average of relative utilizations (rhu) of active hypervisors.
RCU is an average of relative utilizations (rhu) of active nodes.
:param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
"""
hypervisors = model.get_all_hypervisors().values()
nodes = model.get_all_compute_nodes().values()
rcu = {}
counters = {}
for hypervisor in hypervisors:
hyper_state_str = self.get_state_str(hypervisor.state)
if hyper_state_str == hyper_state.HypervisorState.ENABLED.value:
rhu = self.get_relative_hypervisor_utilization(
hypervisor, model)
for node in nodes:
node_state_str = self.get_state_str(node.state)
if node_state_str == element.ServiceState.ENABLED.value:
rhu = self.get_relative_node_utilization(
node, model)
for k in rhu.keys():
if k not in rcu:
rcu[k] = 0
@@ -343,42 +344,43 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
rcu[k] /= counters[k]
return rcu
def is_overloaded(self, hypervisor, model, cc):
"""Indicate whether a hypervisor is overloaded.
def is_overloaded(self, node, model, cc):
"""Indicate whether a node is overloaded.
This considers provided resource capacity coefficients (cc).
:param hypervisor: hypervisor object
:param node: node object
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
:return: [True, False]
"""
hypervisor_capacity = self.get_hypervisor_capacity(hypervisor, model)
hypervisor_utilization = self.get_hypervisor_utilization(
hypervisor, model)
node_capacity = self.get_node_capacity(node, model)
node_utilization = self.get_node_utilization(
node, model)
metrics = ['cpu']
for m in metrics:
if hypervisor_utilization[m] > hypervisor_capacity[m] * cc[m]:
if node_utilization[m] > node_capacity[m] * cc[m]:
return True
return False
def vm_fits(self, vm_uuid, hypervisor, model, cc):
"""Indicate whether is a hypervisor able to accommodate a VM.
def instance_fits(self, instance_uuid, node, model, cc):
"""Indicate whether is a node able to accommodate a VM.
This considers provided resource capacity coefficients (cc).
:param vm_uuid: string
:param hypervisor: hypervisor object
:param instance_uuid: string
:param node: node object
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
:return: [True, False]
"""
hypervisor_capacity = self.get_hypervisor_capacity(hypervisor, model)
hypervisor_utilization = self.get_hypervisor_utilization(
hypervisor, model)
vm_utilization = self.get_vm_utilization(vm_uuid, model)
node_capacity = self.get_node_capacity(node, model)
node_utilization = self.get_node_utilization(
node, model)
instance_utilization = self.get_instance_utilization(
instance_uuid, model)
metrics = ['cpu', 'ram', 'disk']
for m in metrics:
if (vm_utilization[m] + hypervisor_utilization[m] >
hypervisor_capacity[m] * cc[m]):
if (instance_utilization[m] + node_utilization[m] >
node_capacity[m] * cc[m]):
return False
return True
@@ -391,7 +393,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
* A->B, B->C => replace migrations A->B, B->C with
a single migration A->C as both solution result in
VM running on hypervisor C which can be achieved with
VM running on node C which can be achieved with
one migration instead of two.
* A->B, B->A => remove A->B and B->A as they do not result
in a new VM placement.
@@ -401,58 +403,59 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
migrate_actions = (
a for a in self.solution.actions if a[
'action_type'] == 'migrate')
vm_to_be_migrated = (a['input_parameters']['resource_id']
for a in migrate_actions)
vm_uuids = list(set(vm_to_be_migrated))
for vm_uuid in vm_uuids:
instance_to_be_migrated = (
a['input_parameters']['resource_id'] for a in migrate_actions)
instance_uuids = list(set(instance_to_be_migrated))
for instance_uuid in instance_uuids:
actions = list(
a for a in self.solution.actions if a[
'input_parameters'][
'resource_id'] == vm_uuid)
'resource_id'] == instance_uuid)
if len(actions) > 1:
src = actions[0]['input_parameters']['src_hypervisor']
dst = actions[-1]['input_parameters']['dst_hypervisor']
src = actions[0]['input_parameters']['source_node']
dst = actions[-1]['input_parameters']['destination_node']
for a in actions:
self.solution.actions.remove(a)
self.number_of_migrations -= 1
if src != dst:
self.add_migration(vm_uuid, src, dst, model)
self.add_migration(instance_uuid, src, dst, model)
def offload_phase(self, model, cc):
"""Perform offloading phase.
This considers provided resource capacity coefficients.
Offload phase performing first-fit based bin packing to offload
overloaded hypervisors. This is done in a fashion of moving
overloaded nodes. This is done in a fashion of moving
the least CPU utilized VM first as live migration these
generaly causes less troubles. This phase results in a cluster
with no overloaded hypervisors.
* This phase is be able to enable disabled hypervisors (if needed
with no overloaded nodes.
* This phase is be able to enable disabled nodes (if needed
and any available) in the case of the resource capacity provided by
active hypervisors is not able to accomodate all the load.
active nodes is not able to accomodate all the load.
As the offload phase is later followed by the consolidation phase,
the hypervisor enabler in this phase doesn't necessarily results
in more enabled hypervisors in the final solution.
the node enabler in this phase doesn't necessarily results
in more enabled nodes in the final solution.
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
"""
sorted_hypervisors = sorted(
model.get_all_hypervisors().values(),
key=lambda x: self.get_hypervisor_utilization(x, model)['cpu'])
for hypervisor in reversed(sorted_hypervisors):
if self.is_overloaded(hypervisor, model, cc):
for vm in sorted(
model.get_mapping().get_node_vms(hypervisor),
key=lambda x: self.get_vm_utilization(
sorted_nodes = sorted(
model.get_all_compute_nodes().values(),
key=lambda x: self.get_node_utilization(x, model)['cpu'])
for node in reversed(sorted_nodes):
if self.is_overloaded(node, model, cc):
for instance in sorted(
model.mapping.get_node_instances(node),
key=lambda x: self.get_instance_utilization(
x, model)['cpu']
):
for dst_hypervisor in reversed(sorted_hypervisors):
if self.vm_fits(vm, dst_hypervisor, model, cc):
self.add_migration(vm, hypervisor,
dst_hypervisor, model)
for destination_node in reversed(sorted_nodes):
if self.instance_fits(
instance, destination_node, model, cc):
self.add_migration(instance, node,
destination_node, model)
break
if not self.is_overloaded(hypervisor, model, cc):
if not self.is_overloaded(node, model, cc):
break
def consolidation_phase(self, model, cc):
@@ -460,8 +463,8 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
This considers provided resource capacity coefficients.
Consolidation phase performing first-fit based bin packing.
First, hypervisors with the lowest cpu utilization are consolidated
by moving their load to hypervisors with the highest cpu utilization
First, nodes with the lowest cpu utilization are consolidated
by moving their load to nodes with the highest cpu utilization
which can accomodate the load. In this phase the most cpu utilizied
VMs are prioritizied as their load is more difficult to accomodate
in the system than less cpu utilizied VMs which can be later used
@@ -470,22 +473,23 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
"""
sorted_hypervisors = sorted(
model.get_all_hypervisors().values(),
key=lambda x: self.get_hypervisor_utilization(x, model)['cpu'])
sorted_nodes = sorted(
model.get_all_compute_nodes().values(),
key=lambda x: self.get_node_utilization(x, model)['cpu'])
asc = 0
for hypervisor in sorted_hypervisors:
vms = sorted(model.get_mapping().get_node_vms(hypervisor),
key=lambda x: self.get_vm_utilization(x,
model)['cpu'])
for vm in reversed(vms):
dsc = len(sorted_hypervisors) - 1
for dst_hypervisor in reversed(sorted_hypervisors):
for node in sorted_nodes:
instances = sorted(
model.mapping.get_node_instances(node),
key=lambda x: self.get_instance_utilization(x, model)['cpu'])
for instance in reversed(instances):
dsc = len(sorted_nodes) - 1
for destination_node in reversed(sorted_nodes):
if asc >= dsc:
break
if self.vm_fits(vm, dst_hypervisor, model, cc):
self.add_migration(vm, hypervisor,
dst_hypervisor, model)
if self.instance_fits(
instance, destination_node, model, cc):
self.add_migration(instance, node,
destination_node, model)
break
dsc -= 1
asc += 1
@@ -504,7 +508,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
* Offload phase - handling over-utilized resources
* Consolidation phase - handling under-utilized resources
* Solution optimization - reducing number of migrations
* Disability of unused hypervisors
* Disability of unused nodes
:param original_model: root_model object
"""
@@ -524,14 +528,14 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
# Optimize solution
self.optimize_solution(model)
# disable unused hypervisors
self.disable_unused_hypervisors(model)
# disable unused nodes
self.disable_unused_nodes(model)
rcu_after = self.get_relative_cluster_utilization(model)
info = {
'number_of_migrations': self.number_of_migrations,
'number_of_released_hypervisors':
self.number_of_released_hypervisors,
'number_of_released_nodes':
self.number_of_released_nodes,
'relative_cluster_utilization_before': str(rcu),
'relative_cluster_utilization_after': str(rcu_after)
}
@@ -542,5 +546,5 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
# self.solution.efficacy = rcu_after['cpu']
self.solution.set_efficacy_indicators(
released_compute_nodes_count=self.number_of_migrations,
vm_migrations_count=self.number_of_released_hypervisors,
instance_migrations_count=self.number_of_released_nodes,
)

View File

@@ -21,8 +21,7 @@ from oslo_log import log
from watcher._i18n import _, _LE, _LI, _LW
from watcher.common import exception as wexc
from watcher.decision_engine.cluster.history import ceilometer as ceil
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
@@ -37,7 +36,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
servers. It generates solutions to move a workload whenever a server's
CPU utilization % is higher than the specified threshold.
The VM to be moved should make the host close to average workload
of all hypervisors.
of all compute nodes.
*Requirements*
@@ -115,78 +114,83 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
},
}
def calculate_used_resource(self, hypervisor, cap_cores, cap_mem,
def calculate_used_resource(self, node, cap_cores, cap_mem,
cap_disk):
"""Calculate the used vcpus, memory and disk based on VM flavors"""
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
instances = self.compute_model.mapping.get_node_instances(node)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
for vm_id in vms:
vm = self.compute_model.get_vm_from_id(vm_id)
vcpus_used += cap_cores.get_capacity(vm)
memory_mb_used += cap_mem.get_capacity(vm)
disk_gb_used += cap_disk.get_capacity(vm)
for instance_id in instances:
instance = self.compute_model.get_instance_from_id(instance_id)
vcpus_used += cap_cores.get_capacity(instance)
memory_mb_used += cap_mem.get_capacity(instance)
disk_gb_used += cap_disk.get_capacity(instance)
return vcpus_used, memory_mb_used, disk_gb_used
def choose_vm_to_migrate(self, hosts, avg_workload, workload_cache):
"""Pick up an active vm instance to migrate from provided hosts
def choose_instance_to_migrate(self, hosts, avg_workload, workload_cache):
"""Pick up an active instance instance to migrate from provided hosts
:param hosts: the array of dict which contains hypervisor object
:param avg_workload: the average workload value of all hypervisors
:param workload_cache: the map contains vm to workload mapping
:param hosts: the array of dict which contains node object
:param avg_workload: the average workload value of all nodes
:param workload_cache: the map contains instance to workload mapping
"""
for hvmap in hosts:
source_hypervisor = hvmap['hv']
source_vms = self.compute_model.get_mapping().get_node_vms(
source_hypervisor)
if source_vms:
delta_workload = hvmap['workload'] - avg_workload
for instance_data in hosts:
source_node = instance_data['node']
source_instances = self.compute_model.mapping.get_node_instances(
source_node)
if source_instances:
delta_workload = instance_data['workload'] - avg_workload
min_delta = 1000000
instance_id = None
for vm_id in source_vms:
for inst_id in source_instances:
try:
# select the first active VM to migrate
vm = self.compute_model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value:
LOG.debug("VM not active; skipped: %s",
vm.uuid)
instance = self.compute_model.get_instance_from_id(
inst_id)
if (instance.state !=
element.InstanceState.ACTIVE.value):
LOG.debug("Instance not active, skipped: %s",
instance.uuid)
continue
current_delta = delta_workload - workload_cache[vm_id]
current_delta = (
delta_workload - workload_cache[inst_id])
if 0 <= current_delta < min_delta:
min_delta = current_delta
instance_id = vm_id
instance_id = inst_id
except wexc.InstanceNotFound:
LOG.error(_LE("VM not found; error: %s"), vm_id)
LOG.error(_LE("Instance not found; error: %s"),
instance_id)
if instance_id:
return (source_hypervisor,
self.compute_model.get_vm_from_id(instance_id))
return (source_node,
self.compute_model.get_instance_from_id(
instance_id))
else:
LOG.info(_LI("VM not found on hypervisor: %s"),
source_hypervisor.uuid)
LOG.info(_LI("VM not found from node: %s"),
source_node.uuid)
def filter_destination_hosts(self, hosts, vm_to_migrate,
def filter_destination_hosts(self, hosts, instance_to_migrate,
avg_workload, workload_cache):
'''Only return hosts with sufficient available resources'''
cap_cores = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores)
element.ResourceType.cpu_cores)
cap_disk = self.compute_model.get_resource_from_id(
resource.ResourceType.disk)
element.ResourceType.disk)
cap_mem = self.compute_model.get_resource_from_id(
resource.ResourceType.memory)
element.ResourceType.memory)
required_cores = cap_cores.get_capacity(vm_to_migrate)
required_disk = cap_disk.get_capacity(vm_to_migrate)
required_mem = cap_mem.get_capacity(vm_to_migrate)
required_cores = cap_cores.get_capacity(instance_to_migrate)
required_disk = cap_disk.get_capacity(instance_to_migrate)
required_mem = cap_mem.get_capacity(instance_to_migrate)
# filter hypervisors without enough resource
# filter nodes without enough resource
destination_hosts = []
src_vm_workload = workload_cache[vm_to_migrate.uuid]
for hvmap in hosts:
host = hvmap['hv']
workload = hvmap['workload']
src_instance_workload = workload_cache[instance_to_migrate.uuid]
for instance_data in hosts:
host = instance_data['node']
workload = instance_data['workload']
# calculate the available resources
cores_used, mem_used, disk_used = self.calculate_used_resource(
host, cap_cores, cap_mem, cap_disk)
@@ -197,29 +201,29 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
cores_available >= required_cores and
disk_available >= required_disk and
mem_available >= required_mem and
(src_vm_workload + workload) < self.threshold / 100 *
(src_instance_workload + workload) < self.threshold / 100 *
cap_cores.get_capacity(host)
):
destination_hosts.append(hvmap)
destination_hosts.append(instance_data)
return destination_hosts
def group_hosts_by_cpu_util(self):
"""Calculate the workloads of each hypervisor
"""Calculate the workloads of each node
try to find out the hypervisors which have reached threshold
and the hypervisors which are under threshold.
and also calculate the average workload value of all hypervisors.
and also generate the VM workload map.
try to find out the nodes which have reached threshold
and the nodes which are under threshold.
and also calculate the average workload value of all nodes.
and also generate the instance workload map.
"""
hypervisors = self.compute_model.get_all_hypervisors()
cluster_size = len(hypervisors)
if not hypervisors:
nodes = self.compute_model.get_all_compute_nodes()
cluster_size = len(nodes)
if not nodes:
raise wexc.ClusterEmpty()
# get cpu cores capacity of hypervisors and vms
# get cpu cores capacity of nodes and instances
cap_cores = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores)
element.ResourceType.cpu_cores)
overload_hosts = []
nonoverload_hosts = []
# total workload of cluster
@@ -227,16 +231,16 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
cluster_workload = 0.0
# use workload_cache to store the workload of VMs for reuse purpose
workload_cache = {}
for hypervisor_id in hypervisors:
hypervisor = self.compute_model.get_hypervisor_from_id(
hypervisor_id)
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
hypervisor_workload = 0.0
for vm_id in vms:
vm = self.compute_model.get_vm_from_id(vm_id)
for node_id in nodes:
node = self.compute_model.get_node_from_id(
node_id)
instances = self.compute_model.mapping.get_node_instances(node)
node_workload = 0.0
for instance_id in instances:
instance = self.compute_model.get_instance_from_id(instance_id)
try:
cpu_util = self.ceilometer.statistic_aggregation(
resource_id=vm_id,
resource_id=instance_id,
meter_name=self._meter,
period=self._period,
aggregate='avg')
@@ -245,24 +249,25 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
LOG.error(_LE("Can not get cpu_util from Ceilometer"))
continue
if cpu_util is None:
LOG.debug("VM (%s): cpu_util is None", vm_id)
LOG.debug("Instance (%s): cpu_util is None", instance_id)
continue
vm_cores = cap_cores.get_capacity(vm)
workload_cache[vm_id] = cpu_util * vm_cores / 100
hypervisor_workload += workload_cache[vm_id]
LOG.debug("VM (%s): cpu_util %f", vm_id, cpu_util)
hypervisor_cores = cap_cores.get_capacity(hypervisor)
hy_cpu_util = hypervisor_workload / hypervisor_cores * 100
instance_cores = cap_cores.get_capacity(instance)
workload_cache[instance_id] = cpu_util * instance_cores / 100
node_workload += workload_cache[instance_id]
LOG.debug("VM (%s): cpu_util %f", instance_id, cpu_util)
node_cores = cap_cores.get_capacity(node)
hy_cpu_util = node_workload / node_cores * 100
cluster_workload += hypervisor_workload
cluster_workload += node_workload
hvmap = {'hv': hypervisor, "cpu_util": hy_cpu_util, 'workload':
hypervisor_workload}
instance_data = {
'node': node, "cpu_util": hy_cpu_util,
'workload': node_workload}
if hy_cpu_util >= self.threshold:
# mark the hypervisor to release resources
overload_hosts.append(hvmap)
# mark the node to release resources
overload_hosts.append(instance_data)
else:
nonoverload_hosts.append(hvmap)
nonoverload_hosts.append(instance_data)
avg_workload = cluster_workload / cluster_size
@@ -285,52 +290,52 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
"""
self.threshold = self.input_parameters.threshold
self._period = self.input_parameters.period
src_hypervisors, target_hypervisors, avg_workload, workload_cache = (
source_nodes, target_nodes, avg_workload, workload_cache = (
self.group_hosts_by_cpu_util())
if not src_hypervisors:
if not source_nodes:
LOG.debug("No hosts require optimization")
return self.solution
if not target_hypervisors:
if not target_nodes:
LOG.warning(_LW("No hosts current have CPU utilization under %s "
"percent, therefore there are no possible target "
"hosts for any migrations"),
"hosts for any migration"),
self.threshold)
return self.solution
# choose the server with largest cpu_util
src_hypervisors = sorted(src_hypervisors,
reverse=True,
key=lambda x: (x[self.METER_NAME]))
source_nodes = sorted(source_nodes,
reverse=True,
key=lambda x: (x[self.METER_NAME]))
vm_to_migrate = self.choose_vm_to_migrate(
src_hypervisors, avg_workload, workload_cache)
if not vm_to_migrate:
instance_to_migrate = self.choose_instance_to_migrate(
source_nodes, avg_workload, workload_cache)
if not instance_to_migrate:
return self.solution
source_hypervisor, vm_src = vm_to_migrate
source_node, instance_src = instance_to_migrate
# find the hosts that have enough resource for the VM to be migrated
destination_hosts = self.filter_destination_hosts(
target_hypervisors, vm_src, avg_workload, workload_cache)
target_nodes, instance_src, avg_workload, workload_cache)
# sort the filtered result by workload
# pick up the lowest one as dest server
if not destination_hosts:
LOG.warning(_LW("No target host could be found; it might "
"be because there is not enough CPU, memory "
"or disk"))
# for instance.
LOG.warning(_LW("No proper target host could be found, it might "
"be because of there's no enough CPU/Memory/DISK"))
return self.solution
destination_hosts = sorted(destination_hosts,
key=lambda x: (x["cpu_util"]))
# always use the host with lowerest CPU utilization
mig_dst_hypervisor = destination_hosts[0]['hv']
# generate solution to migrate the vm to the dest server,
if self.compute_model.get_mapping().migrate_vm(
vm_src, source_hypervisor, mig_dst_hypervisor):
mig_destination_node = destination_hosts[0]['node']
# generate solution to migrate the instance to the dest server,
if self.compute_model.mapping.migrate_instance(
instance_src, source_node, mig_destination_node):
parameters = {'migration_type': 'live',
'src_hypervisor': source_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid}
'source_node': source_node.uuid,
'destination_node': mig_destination_node.uuid}
self.solution.add_action(action_type=self.MIGRATION,
resource_id=vm_src.uuid,
resource_id=instance_src.uuid,
input_parameters=parameters)
def post_execute(self):

View File

@@ -30,8 +30,7 @@ from watcher._i18n import _LI, _
from watcher.common import exception
from watcher.decision_engine.cluster.history import ceilometer as \
ceilometer_cluster_history
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
@@ -39,8 +38,8 @@ LOG = log.getLogger(__name__)
metrics = ['cpu_util', 'memory.resident']
thresholds_dict = {'cpu_util': 0.2, 'memory.resident': 0.2}
weights_dict = {'cpu_util_weight': 1.0, 'memory.resident_weight': 1.0}
vm_host_measures = {'cpu_util': 'hardware.cpu.util',
'memory.resident': 'hardware.memory.used'}
instance_host_measures = {'cpu_util': 'hardware.cpu.util',
'memory.resident': 'hardware.memory.used'}
ws_opts = [
cfg.ListOpt('metrics',
@@ -154,73 +153,75 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
def ceilometer(self, c):
self._ceilometer = c
def transform_vm_cpu(self, vm_load, host_vcpus):
"""This method transforms vm cpu utilization to overall host cpu utilization.
def transform_instance_cpu(self, instance_load, host_vcpus):
"""Transform instance cpu utilization to overall host cpu utilization.
:param vm_load: dict that contains vm uuid and utilization info.
:param instance_load: dict that contains instance uuid and
utilization info.
:param host_vcpus: int
:return: float value
"""
return vm_load['cpu_util'] * (vm_load['vcpus'] / float(host_vcpus))
return (instance_load['cpu_util'] *
(instance_load['vcpus'] / float(host_vcpus)))
@MEMOIZE
def get_vm_load(self, vm_uuid):
"""Gathering vm load through ceilometer statistic.
def get_instance_load(self, instance_uuid):
"""Gathering instance load through ceilometer statistic.
:param vm_uuid: vm for which statistic is gathered.
:param instance_uuid: instance for which statistic is gathered.
:return: dict
"""
LOG.debug('get_vm_load started')
vm_vcpus = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(
self.compute_model.get_vm_from_id(vm_uuid))
vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus}
LOG.debug('get_instance_load started')
instance_vcpus = self.compute_model.get_resource_from_id(
element.ResourceType.cpu_cores).get_capacity(
self.compute_model.get_instance_from_id(instance_uuid))
instance_load = {'uuid': instance_uuid, 'vcpus': instance_vcpus}
for meter in self.metrics:
avg_meter = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid,
resource_id=instance_uuid,
meter_name=meter,
period="120",
aggregate='min'
)
if avg_meter is None:
raise exception.NoMetricValuesForVM(resource_id=vm_uuid,
metric_name=meter)
vm_load[meter] = avg_meter
return vm_load
raise exception.NoMetricValuesForInstance(
resource_id=instance_uuid, metric_name=meter)
instance_load[meter] = avg_meter
return instance_load
def normalize_hosts_load(self, hosts):
normalized_hosts = deepcopy(hosts)
for host in normalized_hosts:
if 'memory.resident' in normalized_hosts[host]:
h_memory = self.compute_model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(
self.compute_model.get_hypervisor_from_id(host))
element.ResourceType.memory).get_capacity(
self.compute_model.get_node_from_id(host))
normalized_hosts[host]['memory.resident'] /= float(h_memory)
return normalized_hosts
def get_hosts_load(self):
"""Get load of every host by gathering vms load"""
"""Get load of every host by gathering instances load"""
hosts_load = {}
for hypervisor_id in self.compute_model.get_all_hypervisors():
hosts_load[hypervisor_id] = {}
for node_id in self.compute_model.get_all_compute_nodes():
hosts_load[node_id] = {}
host_vcpus = self.compute_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(
self.compute_model.get_hypervisor_from_id(hypervisor_id))
hosts_load[hypervisor_id]['vcpus'] = host_vcpus
element.ResourceType.cpu_cores).get_capacity(
self.compute_model.get_node_from_id(node_id))
hosts_load[node_id]['vcpus'] = host_vcpus
for metric in self.metrics:
avg_meter = self.ceilometer.statistic_aggregation(
resource_id=hypervisor_id,
meter_name=vm_host_measures[metric],
resource_id=node_id,
meter_name=instance_host_measures[metric],
period="60",
aggregate='avg'
)
if avg_meter is None:
raise exception.NoSuchMetricForHost(
metric=vm_host_measures[metric],
host=hypervisor_id)
hosts_load[hypervisor_id][metric] = avg_meter
metric=instance_host_measures[metric],
host=node_id)
hosts_load[node_id][metric] = avg_meter
return hosts_load
def get_sd(self, hosts, meter_name):
@@ -249,33 +250,34 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
" for %s in weight dict.") % metric)
return weighted_sd
def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id):
def calculate_migration_case(self, hosts, instance_id,
src_node_id, dst_node_id):
"""Calculate migration case
Return list of standard deviation values, that appearing in case of
migration of vm from source host to destination host
migration of instance from source host to destination host
:param hosts: hosts with their workload
:param vm_id: the virtual machine
:param src_hp_id: the source hypervisor id
:param dst_hp_id: the destination hypervisor id
:param instance_id: the virtual machine
:param src_node_id: the source node id
:param dst_node_id: the destination node id
:return: list of standard deviation values
"""
migration_case = []
new_hosts = deepcopy(hosts)
vm_load = self.get_vm_load(vm_id)
d_host_vcpus = new_hosts[dst_hp_id]['vcpus']
s_host_vcpus = new_hosts[src_hp_id]['vcpus']
instance_load = self.get_instance_load(instance_id)
d_host_vcpus = new_hosts[dst_node_id]['vcpus']
s_host_vcpus = new_hosts[src_node_id]['vcpus']
for metric in self.metrics:
if metric is 'cpu_util':
new_hosts[src_hp_id][metric] -= self.transform_vm_cpu(
vm_load,
new_hosts[src_node_id][metric] -= self.transform_instance_cpu(
instance_load,
s_host_vcpus)
new_hosts[dst_hp_id][metric] += self.transform_vm_cpu(
vm_load,
new_hosts[dst_node_id][metric] += self.transform_instance_cpu(
instance_load,
d_host_vcpus)
else:
new_hosts[src_hp_id][metric] -= vm_load[metric]
new_hosts[dst_hp_id][metric] += vm_load[metric]
new_hosts[src_node_id][metric] -= instance_load[metric]
new_hosts[dst_node_id][metric] += instance_load[metric]
normalized_hosts = self.normalize_hosts_load(new_hosts)
for metric in self.metrics:
migration_case.append(self.get_sd(normalized_hosts, metric))
@@ -283,45 +285,46 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
return migration_case
def simulate_migrations(self, hosts):
"""Make sorted list of pairs vm:dst_host"""
def yield_hypervisors(hypervisors):
"""Make sorted list of pairs instance:dst_host"""
def yield_nodes(nodes):
ct = CONF['watcher_strategies.workload_stabilization'].retry_count
if self.host_choice == 'cycle':
for i in itertools.cycle(hypervisors):
for i in itertools.cycle(nodes):
yield [i]
if self.host_choice == 'retry':
while True:
yield random.sample(hypervisors, ct)
yield random.sample(nodes, ct)
if self.host_choice == 'fullsearch':
while True:
yield hypervisors
yield nodes
vm_host_map = []
for source_hp_id in self.compute_model.get_all_hypervisors():
hypervisors = list(self.compute_model.get_all_hypervisors())
hypervisors.remove(source_hp_id)
hypervisor_list = yield_hypervisors(hypervisors)
vms_id = self.compute_model.get_mapping(). \
get_node_vms_from_id(source_hp_id)
for vm_id in vms_id:
instance_host_map = []
for source_hp_id in self.compute_model.get_all_compute_nodes():
nodes = list(self.compute_model.get_all_compute_nodes())
nodes.remove(source_hp_id)
node_list = yield_nodes(nodes)
instances_id = self.compute_model.get_mapping(). \
get_node_instances_from_id(source_hp_id)
for instance_id in instances_id:
min_sd_case = {'value': len(self.metrics)}
vm = self.compute_model.get_vm_from_id(vm_id)
if vm.state not in [vm_state.VMState.ACTIVE.value,
vm_state.VMState.PAUSED.value]:
instance = self.compute_model.get_instance_from_id(instance_id)
if instance.state not in [element.InstanceState.ACTIVE.value,
element.InstanceState.PAUSED.value]:
continue
for dst_hp_id in next(hypervisor_list):
sd_case = self.calculate_migration_case(hosts, vm_id,
for dst_node_id in next(node_list):
sd_case = self.calculate_migration_case(hosts, instance_id,
source_hp_id,
dst_hp_id)
dst_node_id)
weighted_sd = self.calculate_weighted_sd(sd_case[:-1])
if weighted_sd < min_sd_case['value']:
min_sd_case = {'host': dst_hp_id, 'value': weighted_sd,
's_host': source_hp_id, 'vm': vm_id}
vm_host_map.append(min_sd_case)
min_sd_case = {
'host': dst_node_id, 'value': weighted_sd,
's_host': source_hp_id, 'instance': instance_id}
instance_host_map.append(min_sd_case)
break
return sorted(vm_host_map, key=lambda x: x['value'])
return sorted(instance_host_map, key=lambda x: x['value'])
def check_threshold(self):
"""Check if cluster is needed in balancing"""
@@ -335,32 +338,32 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
def add_migration(self,
resource_id,
migration_type,
src_hypervisor,
dst_hypervisor):
source_node,
destination_node):
parameters = {'migration_type': migration_type,
'src_hypervisor': src_hypervisor,
'dst_hypervisor': dst_hypervisor}
'source_node': source_node,
'destination_node': destination_node}
self.solution.add_action(action_type=self.MIGRATION,
resource_id=resource_id,
input_parameters=parameters)
def create_migration_vm(self, mig_vm, mig_src_hypervisor,
mig_dst_hypervisor):
def create_migration_instance(self, mig_instance, mig_source_node,
mig_destination_node):
"""Create migration VM """
if self.compute_model.get_mapping().migrate_vm(
mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
self.add_migration(mig_vm.uuid, 'live',
mig_src_hypervisor.uuid,
mig_dst_hypervisor.uuid)
if self.compute_model.get_mapping().migrate_instance(
mig_instance, mig_source_node, mig_destination_node):
self.add_migration(mig_instance.uuid, 'live',
mig_source_node.uuid,
mig_destination_node.uuid)
def migrate(self, vm_uuid, src_host, dst_host):
mig_vm = self.compute_model.get_vm_from_id(vm_uuid)
mig_src_hypervisor = self.compute_model.get_hypervisor_from_id(
def migrate(self, instance_uuid, src_host, dst_host):
mig_instance = self.compute_model.get_instance_from_id(instance_uuid)
mig_source_node = self.compute_model.get_node_from_id(
src_host)
mig_dst_hypervisor = self.compute_model.get_hypervisor_from_id(
mig_destination_node = self.compute_model.get_node_from_id(
dst_host)
self.create_migration_vm(mig_vm, mig_src_hypervisor,
mig_dst_hypervisor)
self.create_migration_instance(mig_instance, mig_source_node,
mig_destination_node)
def fill_solution(self):
self.solution.model = self.compute_model
@@ -378,28 +381,29 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
hosts_load = self.get_hosts_load()
min_sd = 1
balanced = False
for vm_host in migration:
for instance_host in migration:
dst_hp_disk = self.compute_model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(
self.compute_model.get_hypervisor_from_id(
vm_host['host']))
vm_disk = self.compute_model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(
self.compute_model.get_vm_from_id(vm_host['vm']))
if vm_disk > dst_hp_disk:
element.ResourceType.disk).get_capacity(
self.compute_model.get_node_from_id(
instance_host['host']))
instance_disk = self.compute_model.get_resource_from_id(
element.ResourceType.disk).get_capacity(
self.compute_model.get_instance_from_id(
instance_host['instance']))
if instance_disk > dst_hp_disk:
continue
vm_load = self.calculate_migration_case(hosts_load,
vm_host['vm'],
vm_host['s_host'],
vm_host['host'])
weighted_sd = self.calculate_weighted_sd(vm_load[:-1])
instance_load = self.calculate_migration_case(
hosts_load, instance_host['instance'],
instance_host['s_host'], instance_host['host'])
weighted_sd = self.calculate_weighted_sd(instance_load[:-1])
if weighted_sd < min_sd:
min_sd = weighted_sd
hosts_load = vm_load[-1]
self.migrate(vm_host['vm'],
vm_host['s_host'], vm_host['host'])
hosts_load = instance_load[-1]
self.migrate(instance_host['instance'],
instance_host['s_host'],
instance_host['host'])
for metric, value in zip(self.metrics, vm_load[:-1]):
for metric, value in zip(self.metrics, instance_load[:-1]):
if value < float(self.thresholds[metric]):
balanced = True
break