Remove obsolete Resource element

Partially Implements: blueprint graph-based-cluster-model

Change-Id: I2765d1c7a864d6658c847dcd988314fc8f11049c
This commit is contained in:
Vincent Françoise
2017-01-11 15:29:25 +01:00
parent d433d6b3c8
commit edd3d219d5
21 changed files with 286 additions and 594 deletions

View File

@@ -108,12 +108,15 @@ import copy
import threading
from oslo_config import cfg
from oslo_log import log
import six
from watcher.common import clients
from watcher.common.loader import loadable
from watcher.decision_engine.model import model_root
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class BaseClusterDataModelCollector(loadable.LoadableSingleton):
@@ -169,6 +172,8 @@ class BaseClusterDataModelCollector(loadable.LoadableSingleton):
]
def get_latest_cluster_data_model(self):
LOG.debug("Creating copy")
LOG.debug(self.cluster_data_model.to_xml())
return copy.deepcopy(self.cluster_data_model)
def synchronize(self):

View File

@@ -132,7 +132,6 @@ class ModelBuilder(object):
compute_service = self.nova_helper.get_service(node.service["id"])
node_attributes = {
"id": node.id,
"human_id": None, # TODO(v-francoise): get rid of it
"uuid": compute_service.host,
"hostname": node.hypervisor_hostname,
"memory": node.memory_mb,

View File

@@ -16,10 +16,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.model.element import disk_info
from watcher.decision_engine.model.element import instance
from watcher.decision_engine.model.element import node
from watcher.decision_engine.model.element import resource
ServiceState = node.ServiceState
ComputeNode = node.ComputeNode
@@ -27,12 +25,4 @@ ComputeNode = node.ComputeNode
InstanceState = instance.InstanceState
Instance = instance.Instance
DiskInfo = disk_info.DiskInfo
ResourceType = resource.ResourceType
Resource = resource.Resource
__all__ = [
'ServiceState', 'ComputeNode', 'InstanceState', 'Instance',
'DiskInfo', 'ResourceType', 'Resource']
__all__ = ['ServiceState', 'ComputeNode', 'InstanceState', 'Instance']

View File

@@ -1,59 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.model.element import base
class DiskInfo(base.Element):
def __init__(self):
self.name = ""
self.major = 0
self.minor = 0
self.size = 0
self.scheduler = ""
def accept(self, visitor):
raise NotImplementedError()
def set_size(self, size):
"""DiskInfo
:param size: Size in bytes
"""
self.size = size
def get_size(self):
return self.size
def set_scheduler(self, scheduler):
"""DiskInfo
I/O Scheduler noop cfq deadline
:param scheduler:
:return:
"""
self.scheduler = scheduler
def set_device_name(self, name):
"""Device name
:param name:
"""
self.name = name
def get_device_name(self):
return self.name

View File

@@ -1,64 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
from watcher.common import exception
class ResourceType(enum.Enum):
cpu_cores = 'vcpus'
vcpus = 'vcpus'
memory = 'memory'
disk = 'disk'
disk_capacity = 'disk_capacity'
class Resource(object):
def __init__(self, name, capacity=None):
"""Resource
:param name: ResourceType
:param capacity: max
:return:
"""
self._name = name
self.capacity = capacity
self.mapping = {}
@property
def name(self):
return self._name
@name.setter
def name(self, n):
self._name = n
def set_capacity(self, element, value):
self.mapping[element.uuid] = value
def unset_capacity(self, element):
del self.mapping[element.uuid]
def get_capacity_by_uuid(self, uuid):
try:
return self.mapping[str(uuid)]
except KeyError:
raise exception.CapacityNotDefined(
capacity=self.name.value, resource=str(uuid))
def get_capacity(self, element):
return self.get_capacity_by_uuid(element.uuid)

View File

@@ -172,22 +172,6 @@ class ModelRoot(nx.DiGraph, base.Model):
return {inst.uuid: inst for inst in self.nodes()
if isinstance(inst, element.Instance)}
@lockutils.synchronized("model_root")
def get_resource_by_uuid(self, resource_id):
# TODO(v-francoise): deprecate this method
# This is a trick to keep the compatibility with the old model root
class Resource(object):
def __init__(self, resource_id):
if isinstance(resource_id, element.ResourceType):
resource_id = resource_id.value
self.resource_id = resource_id
def get_capacity(self, element):
# We ignore element because value already contains the value
return getattr(element, self.resource_id)
return Resource(resource_id)
@lockutils.synchronized("model_root")
def get_node_instances(self, node):
self.assert_node(node)

View File

@@ -64,23 +64,19 @@ class NovaNotification(base.NotificationEndpoint):
instance_data = data['nova_object.data']
instance_flavor_data = instance_data['flavor']['nova_object.data']
instance.update({
'state': instance_data['state'],
'hostname': instance_data['host_name'],
'human_id': instance_data['display_name'],
})
memory_mb = instance_flavor_data['memory_mb']
num_cores = instance_flavor_data['vcpus']
disk_gb = instance_flavor_data['root_gb']
self.update_capacity(element.ResourceType.memory, instance, memory_mb)
self.update_capacity(
element.ResourceType.vcpus, instance, num_cores)
self.update_capacity(
element.ResourceType.disk, instance, disk_gb)
self.update_capacity(
element.ResourceType.disk_capacity, instance, disk_gb)
instance.update({
'state': instance_data['state'],
'hostname': instance_data['host_name'],
'human_id': instance_data['display_name'],
'memory': memory_mb,
'vcpus': num_cores,
'disk': disk_gb,
'disk_capacity': disk_gb,
})
try:
node = self.get_or_create_node(instance_data['host'])
@@ -91,27 +87,20 @@ class NovaNotification(base.NotificationEndpoint):
self.update_instance_mapping(instance, node)
def update_capacity(self, resource_id, obj, value):
setattr(obj, resource_id.value, value)
def legacy_update_instance(self, instance, data):
instance.update({
'state': data['state'],
'hostname': data['hostname'],
'human_id': data['display_name'],
})
memory_mb = data['memory_mb']
num_cores = data['vcpus']
disk_gb = data['root_gb']
self.update_capacity(element.ResourceType.memory, instance, memory_mb)
self.update_capacity(
element.ResourceType.vcpus, instance, num_cores)
self.update_capacity(
element.ResourceType.disk, instance, disk_gb)
self.update_capacity(
element.ResourceType.disk_capacity, instance, disk_gb)
instance.update({
'state': data['state'],
'hostname': data['hostname'],
'human_id': data['display_name'],
'memory': memory_mb,
'vcpus': num_cores,
'disk': disk_gb,
'disk_capacity': disk_gb,
})
try:
node = self.get_or_create_node(data['host'])
@@ -147,16 +136,12 @@ class NovaNotification(base.NotificationEndpoint):
uuid=node_hostname,
hostname=_node.hypervisor_hostname,
state=_node.state,
status=_node.status)
self.update_capacity(
element.ResourceType.memory, node, _node.memory_mb)
self.update_capacity(
element.ResourceType.vcpus, node, _node.vcpus)
self.update_capacity(
element.ResourceType.disk, node, _node.free_disk_gb)
self.update_capacity(
element.ResourceType.disk_capacity, node, _node.local_gb)
status=_node.status,
memory=_node.memory_mb,
vcpus=_node.vcpus,
disk=_node.free_disk_gb,
disk_capacity=_node.local_gb,
)
return node
except Exception as exc:
LOG.exception(exc)
@@ -176,6 +161,7 @@ class NovaNotification(base.NotificationEndpoint):
node = self.create_compute_node(uuid)
LOG.debug("New compute node created: %s", uuid)
self.cluster_data_model.add_node(node)
LOG.debug("New compute node mapped: %s", uuid)
return node
def update_instance_mapping(self, instance, node):

View File

@@ -173,23 +173,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
total_cores = 0
total_disk = 0
total_mem = 0
cpu_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk)
memory_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.memory)
for instance in self.compute_model.get_node_instances(
destination_node):
total_cores += cpu_capacity.get_capacity(instance)
total_disk += disk_capacity.get_capacity(instance)
total_mem += memory_capacity.get_capacity(instance)
total_cores += instance.vcpus
total_disk += instance.disk
total_mem += instance.memory
# capacity requested by the compute node
total_cores += cpu_capacity.get_capacity(instance_to_migrate)
total_disk += disk_capacity.get_capacity(instance_to_migrate)
total_mem += memory_capacity.get_capacity(instance_to_migrate)
total_cores += instance_to_migrate.vcpus
total_disk += instance_to_migrate.disk
total_mem += instance_to_migrate.memory
return self.check_threshold(destination_node, total_cores, total_disk,
total_mem)
@@ -208,12 +201,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
:param total_mem: total memory used by the virtual machine
:return: True if the threshold is not exceed
"""
cpu_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.vcpus).get_capacity(destination_node)
disk_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk).get_capacity(destination_node)
memory_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.memory).get_capacity(destination_node)
cpu_capacity = destination_node.vcpus
disk_capacity = destination_node.disk
memory_capacity = destination_node.memory
return (cpu_capacity >= total_cores * self.threshold_cores and
disk_capacity >= total_disk * self.threshold_disk and
@@ -229,14 +219,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
:param total_memory_used:
:return:
"""
cpu_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.vcpus).get_capacity(compute_resource)
disk_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk).get_capacity(compute_resource)
memory_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.memory).get_capacity(compute_resource)
cpu_capacity = compute_resource.vcpus
disk_capacity = compute_resource.disk
memory_capacity = compute_resource.memory
score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
float(cpu_capacity))
@@ -331,10 +316,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
self.config.datasource]['host_cpu_usage']))
host_avg_cpu_util = 100
cpu_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.vcpus).get_capacity(node)
total_cores_used = cpu_capacity * (host_avg_cpu_util / 100.0)
total_cores_used = node.vcpus * (host_avg_cpu_util / 100.0)
return self.calculate_weight(node, total_cores_used, 0, 0)
@@ -354,10 +336,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
self.config.datasource]['instance_cpu_usage']))
instance_cpu_utilization = 100
cpu_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.vcpus).get_capacity(instance)
total_cores_used = cpu_capacity * (instance_cpu_utilization / 100.0)
total_cores_used = instance.vcpus * (instance_cpu_utilization / 100.0)
return self.calculate_weight(instance, total_cores_used, 0, 0)

View File

@@ -121,17 +121,16 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
def ceilometer(self, c):
self._ceilometer = c
def calc_used_res(self, node, cpu_capacity,
memory_capacity, disk_capacity):
def calc_used_resource(self, node):
"""Calculate the used vcpus, memory and disk based on VM flavors"""
instances = self.compute_model.get_node_instances(node)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
for instance in instances:
vcpus_used += cpu_capacity.get_capacity(instance)
memory_mb_used += memory_capacity.get_capacity(instance)
disk_gb_used += disk_capacity.get_capacity(instance)
vcpus_used += instance.vcpus
memory_mb_used += instance.memory
disk_gb_used += instance.disk
return vcpus_used, memory_mb_used, disk_gb_used
@@ -189,27 +188,19 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
def filter_dest_servers(self, hosts, instance_to_migrate):
"""Only return hosts with sufficient available resources"""
cpu_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk)
memory_capacity = self.compute_model.get_resource_by_uuid(
element.ResourceType.memory)
required_cores = cpu_capacity.get_capacity(instance_to_migrate)
required_disk = disk_capacity.get_capacity(instance_to_migrate)
required_memory = memory_capacity.get_capacity(instance_to_migrate)
required_cores = instance_to_migrate.vcpus
required_disk = instance_to_migrate.disk
required_memory = instance_to_migrate.memory
# filter nodes without enough resource
dest_servers = []
for instance_data in hosts:
host = instance_data['node']
# available
cores_used, mem_used, disk_used = self.calc_used_res(
host, cpu_capacity, memory_capacity, disk_capacity)
cores_available = cpu_capacity.get_capacity(host) - cores_used
disk_available = disk_capacity.get_capacity(host) - disk_used
mem_available = memory_capacity.get_capacity(host) - mem_used
cores_used, mem_used, disk_used = self.calc_used_resource(host)
cores_available = host.vcpus - cores_used
disk_available = host.disk - disk_used
mem_available = host.memory - mem_used
if cores_available >= required_cores \
and disk_available >= required_disk \
and mem_available >= required_memory:

View File

@@ -164,16 +164,16 @@ class UniformAirflow(base.BaseStrategy):
},
}
def calculate_used_resource(self, node, cap_cores, cap_mem, cap_disk):
def calculate_used_resource(self, node):
"""Compute the used vcpus, memory and disk based on instance flavors"""
instances = self.compute_model.get_node_instances(node)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
for instance in instances:
vcpus_used += cap_cores.get_capacity(instance)
memory_mb_used += cap_mem.get_capacity(instance)
disk_gb_used += cap_disk.get_capacity(instance)
vcpus_used += instance.vcpus
memory_mb_used += instance.memory
disk_gb_used += instance.disk
return vcpus_used, memory_mb_used, disk_gb_used
@@ -221,23 +221,16 @@ class UniformAirflow(base.BaseStrategy):
def filter_destination_hosts(self, hosts, instances_to_migrate):
"""Find instance and host with sufficient available resources"""
cap_cores = self.compute_model.get_resource_by_uuid(
element.ResourceType.cpu_cores)
cap_disk = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk)
cap_mem = self.compute_model.get_resource_by_uuid(
element.ResourceType.memory)
# large instance go first
# large instances go first
instances_to_migrate = sorted(
instances_to_migrate, reverse=True,
key=lambda x: (cap_cores.get_capacity(x)))
key=lambda x: (x.vcpus))
# find hosts for instances
destination_hosts = []
for instance_to_migrate in instances_to_migrate:
required_cores = cap_cores.get_capacity(instance_to_migrate)
required_disk = cap_disk.get_capacity(instance_to_migrate)
required_mem = cap_mem.get_capacity(instance_to_migrate)
required_cores = instance_to_migrate.vcpus
required_disk = instance_to_migrate.disk
required_mem = instance_to_migrate.memory
dest_migrate_info = {}
for nodemap in hosts:
host = nodemap['node']
@@ -245,13 +238,13 @@ class UniformAirflow(base.BaseStrategy):
# calculate the available resources
nodemap['cores_used'], nodemap['mem_used'],\
nodemap['disk_used'] = self.calculate_used_resource(
host, cap_cores, cap_mem, cap_disk)
cores_available = (cap_cores.get_capacity(host) -
host)
cores_available = (host.vcpus -
nodemap['cores_used'])
disk_available = (cap_disk.get_capacity(host) -
disk_available = (host.disk -
nodemap['disk_used'])
mem_available = (
cap_mem.get_capacity(host) - nodemap['mem_used'])
host.memory - nodemap['mem_used'])
if (cores_available >= required_cores and
disk_available >= required_disk and
mem_available >= required_mem):

View File

@@ -139,14 +139,12 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
input_parameters=params)
self.number_of_released_nodes += 1
def add_migration(self, instance, source_node,
destination_node, model):
def add_migration(self, instance, source_node, destination_node):
"""Add an action for VM migration into the solution.
:param instance: instance object
:param source_node: node object
:param destination_node: node object
:param model: model_root object
:return: None
"""
instance_state_str = self.get_state_str(instance.state)
@@ -168,7 +166,8 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
if destination_node_state_str == element.ServiceState.DISABLED.value:
self.add_action_enable_compute_node(destination_node)
if model.migrate_instance(instance, source_node, destination_node):
if self.compute_model.migrate_instance(
instance, source_node, destination_node):
params = {'migration_type': migration_type,
'source_node': source_node.uuid,
'destination_node': destination_node.uuid}
@@ -177,30 +176,28 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
input_parameters=params)
self.number_of_migrations += 1
def disable_unused_nodes(self, model):
def disable_unused_nodes(self):
"""Generate actions for disablity of unused nodes.
:param model: model_root object
:return: None
"""
for node in model.get_all_compute_nodes().values():
if (len(model.get_node_instances(node)) == 0 and
for node in self.compute_model.get_all_compute_nodes().values():
if (len(self.compute_model.get_node_instances(node)) == 0 and
node.status !=
element.ServiceState.DISABLED.value):
self.add_action_disable_node(node)
def get_instance_utilization(self, instance_uuid, model,
def get_instance_utilization(self, instance,
period=3600, aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a VM.
:param instance_uuid: instance object
:param model: model_root object
:param instance: instance object
:param period: seconds
:param aggr: string
:return: dict(cpu(number of vcpus used), ram(MB used), disk(B used))
"""
if instance_uuid in self.ceilometer_instance_data_cache.keys():
return self.ceilometer_instance_data_cache.get(instance_uuid)
if instance.uuid in self.ceilometer_instance_data_cache.keys():
return self.ceilometer_instance_data_cache.get(instance.uuid)
cpu_util_metric = 'cpu_util'
ram_util_metric = 'memory.usage'
@@ -208,58 +205,54 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
ram_alloc_metric = 'memory'
disk_alloc_metric = 'disk.root.size'
instance_cpu_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=cpu_util_metric,
resource_id=instance.uuid, meter_name=cpu_util_metric,
period=period, aggregate=aggr)
instance_cpu_cores = model.get_resource_by_uuid(
element.ResourceType.cpu_cores).get_capacity(
model.get_instance_by_uuid(instance_uuid))
if instance_cpu_util:
total_cpu_utilization = (
instance_cpu_cores * (instance_cpu_util / 100.0))
instance.vcpus * (instance_cpu_util / 100.0))
else:
total_cpu_utilization = instance_cpu_cores
total_cpu_utilization = instance.vcpus
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=ram_util_metric,
resource_id=instance.uuid, meter_name=ram_util_metric,
period=period, aggregate=aggr)
if not instance_ram_util:
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=ram_alloc_metric,
resource_id=instance.uuid, meter_name=ram_alloc_metric,
period=period, aggregate=aggr)
instance_disk_util = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid, meter_name=disk_alloc_metric,
resource_id=instance.uuid, meter_name=disk_alloc_metric,
period=period, aggregate=aggr)
if not instance_ram_util or not instance_disk_util:
LOG.error(
_LE('No values returned by %s for memory.usage '
'or disk.root.size'), instance_uuid)
'or disk.root.size'), instance.uuid)
raise exception.NoDataFound
self.ceilometer_instance_data_cache[instance_uuid] = dict(
self.ceilometer_instance_data_cache[instance.uuid] = dict(
cpu=total_cpu_utilization, ram=instance_ram_util,
disk=instance_disk_util)
return self.ceilometer_instance_data_cache.get(instance_uuid)
return self.ceilometer_instance_data_cache.get(instance.uuid)
def get_node_utilization(self, node, model, period=3600, aggr='avg'):
def get_node_utilization(self, node, period=3600, aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a node.
:param node: node object
:param model: model_root object
:param period: seconds
:param aggr: string
:return: dict(cpu(number of cores used), ram(MB used), disk(B used))
"""
node_instances = model.get_node_instances(node)
node_instances = self.compute_model.get_node_instances(node)
node_ram_util = 0
node_disk_util = 0
node_cpu_util = 0
for instance in node_instances:
instance_util = self.get_instance_utilization(
instance.uuid, model, period, aggr)
instance, period, aggr)
node_cpu_util += instance_util['cpu']
node_ram_util += instance_util['ram']
node_disk_util += instance_util['disk']
@@ -267,53 +260,40 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
return dict(cpu=node_cpu_util, ram=node_ram_util,
disk=node_disk_util)
def get_node_capacity(self, node, model):
def get_node_capacity(self, node):
"""Collect cpu, ram and disk capacity of a node.
:param node: node object
:param model: model_root object
:return: dict(cpu(cores), ram(MB), disk(B))
"""
node_cpu_capacity = model.get_resource_by_uuid(
element.ResourceType.cpu_cores).get_capacity(node)
return dict(cpu=node.vcpus, ram=node.memory, disk=node.disk_capacity)
node_disk_capacity = model.get_resource_by_uuid(
element.ResourceType.disk_capacity).get_capacity(node)
node_ram_capacity = model.get_resource_by_uuid(
element.ResourceType.memory).get_capacity(node)
return dict(cpu=node_cpu_capacity, ram=node_ram_capacity,
disk=node_disk_capacity)
def get_relative_node_utilization(self, node, model):
"""Return relative node utilization (rhu).
def get_relative_node_utilization(self, node):
"""Return relative node utilization.
:param node: node object
:param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
"""
rhu = {}
util = self.get_node_utilization(node, model)
cap = self.get_node_capacity(node, model)
relative_node_utilization = {}
util = self.get_node_utilization(node)
cap = self.get_node_capacity(node)
for k in util.keys():
rhu[k] = float(util[k]) / float(cap[k])
return rhu
relative_node_utilization[k] = float(util[k]) / float(cap[k])
return relative_node_utilization
def get_relative_cluster_utilization(self, model):
def get_relative_cluster_utilization(self):
"""Calculate relative cluster utilization (rcu).
RCU is an average of relative utilizations (rhu) of active nodes.
:param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
"""
nodes = model.get_all_compute_nodes().values()
nodes = self.compute_model.get_all_compute_nodes().values()
rcu = {}
counters = {}
for node in nodes:
node_state_str = self.get_state_str(node.state)
if node_state_str == element.ServiceState.ENABLED.value:
rhu = self.get_relative_node_utilization(
node, model)
rhu = self.get_relative_node_utilization(node)
for k in rhu.keys():
if k not in rcu:
rcu[k] = 0
@@ -325,39 +305,35 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
rcu[k] /= counters[k]
return rcu
def is_overloaded(self, node, model, cc):
def is_overloaded(self, node, cc):
"""Indicate whether a node is overloaded.
This considers provided resource capacity coefficients (cc).
:param node: node object
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
:return: [True, False]
"""
node_capacity = self.get_node_capacity(node, model)
node_capacity = self.get_node_capacity(node)
node_utilization = self.get_node_utilization(
node, model)
node)
metrics = ['cpu']
for m in metrics:
if node_utilization[m] > node_capacity[m] * cc[m]:
return True
return False
def instance_fits(self, instance_uuid, node, model, cc):
def instance_fits(self, instance, node, cc):
"""Indicate whether is a node able to accommodate a VM.
This considers provided resource capacity coefficients (cc).
:param instance_uuid: string
:param instance: :py:class:`~.element.Instance`
:param node: node object
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
:return: [True, False]
"""
node_capacity = self.get_node_capacity(node, model)
node_utilization = self.get_node_utilization(
node, model)
instance_utilization = self.get_instance_utilization(
instance_uuid, model)
node_capacity = self.get_node_capacity(node)
node_utilization = self.get_node_utilization(node)
instance_utilization = self.get_instance_utilization(instance)
metrics = ['cpu', 'ram', 'disk']
for m in metrics:
if (instance_utilization[m] + node_utilization[m] >
@@ -365,7 +341,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
return False
return True
def optimize_solution(self, model):
def optimize_solution(self):
"""Optimize solution.
This is done by eliminating unnecessary or circular set of migrations
@@ -378,8 +354,6 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
one migration instead of two.
* A->B, B->A => remove A->B and B->A as they do not result
in a new VM placement.
:param model: model_root object
"""
migrate_actions = (
a for a in self.solution.actions if a[
@@ -398,13 +372,15 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
for a in actions:
self.solution.actions.remove(a)
self.number_of_migrations -= 1
src_node = model.get_node_by_uuid(src_uuid)
dst_node = model.get_node_by_uuid(dst_uuid)
instance = model.get_instance_by_uuid(instance_uuid)
if model.migrate_instance(instance, dst_node, src_node):
self.add_migration(instance, src_node, dst_node, model)
src_node = self.compute_model.get_node_by_uuid(src_uuid)
dst_node = self.compute_model.get_node_by_uuid(dst_uuid)
instance = self.compute_model.get_instance_by_uuid(
instance_uuid)
if self.compute_model.migrate_instance(
instance, dst_node, src_node):
self.add_migration(instance, src_node, dst_node)
def offload_phase(self, model, cc):
def offload_phase(self, cc):
"""Perform offloading phase.
This considers provided resource capacity coefficients.
@@ -420,29 +396,28 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
the node enabler in this phase doesn't necessarily results
in more enabled nodes in the final solution.
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
"""
sorted_nodes = sorted(
model.get_all_compute_nodes().values(),
key=lambda x: self.get_node_utilization(x, model)['cpu'])
self.compute_model.get_all_compute_nodes().values(),
key=lambda x: self.get_node_utilization(x)['cpu'])
for node in reversed(sorted_nodes):
if self.is_overloaded(node, model, cc):
if self.is_overloaded(node, cc):
for instance in sorted(
model.get_node_instances(node),
self.compute_model.get_node_instances(node),
key=lambda x: self.get_instance_utilization(
x.uuid, model)['cpu']
x)['cpu']
):
for destination_node in reversed(sorted_nodes):
if self.instance_fits(
instance.uuid, destination_node, model, cc):
instance, destination_node, cc):
self.add_migration(instance, node,
destination_node, model)
destination_node)
break
if not self.is_overloaded(node, model, cc):
if not self.is_overloaded(node, cc):
break
def consolidation_phase(self, model, cc):
def consolidation_phase(self, cc):
"""Perform consolidation phase.
This considers provided resource capacity coefficients.
@@ -454,27 +429,25 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
in the system than less cpu utilizied VMs which can be later used
to fill smaller CPU capacity gaps.
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
"""
sorted_nodes = sorted(
model.get_all_compute_nodes().values(),
key=lambda x: self.get_node_utilization(x, model)['cpu'])
self.compute_model.get_all_compute_nodes().values(),
key=lambda x: self.get_node_utilization(x)['cpu'])
asc = 0
for node in sorted_nodes:
instances = sorted(
model.get_node_instances(node),
key=lambda x: self.get_instance_utilization(
x.uuid, model)['cpu'])
self.compute_model.get_node_instances(node),
key=lambda x: self.get_instance_utilization(x)['cpu'])
for instance in reversed(instances):
dsc = len(sorted_nodes) - 1
for destination_node in reversed(sorted_nodes):
if asc >= dsc:
break
if self.instance_fits(
instance.uuid, destination_node, model, cc):
instance, destination_node, cc):
self.add_migration(instance, node,
destination_node, model)
destination_node)
break
dsc -= 1
asc += 1
@@ -503,25 +476,23 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
:param original_model: root_model object
"""
LOG.info(_LI('Executing Smart Strategy'))
model = self.compute_model
rcu = self.get_relative_cluster_utilization(model)
self.ceilometer_vm_data_cache = dict()
rcu = self.get_relative_cluster_utilization()
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
# Offloading phase
self.offload_phase(model, cc)
self.offload_phase(cc)
# Consolidation phase
self.consolidation_phase(model, cc)
self.consolidation_phase(cc)
# Optimize solution
self.optimize_solution(model)
self.optimize_solution()
# disable unused nodes
self.disable_unused_nodes(model)
self.disable_unused_nodes()
rcu_after = self.get_relative_cluster_utilization(model)
rcu_after = self.get_relative_cluster_utilization()
info = {
"compute_nodes_count": len(
self.compute_model.get_all_compute_nodes()),

View File

@@ -145,17 +145,16 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
},
}
def calculate_used_resource(self, node, cap_cores, cap_mem,
cap_disk):
def calculate_used_resource(self, node):
"""Calculate the used vcpus, memory and disk based on VM flavors"""
instances = self.compute_model.get_node_instances(node)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
for instance in instances:
vcpus_used += cap_cores.get_capacity(instance)
memory_mb_used += cap_mem.get_capacity(instance)
disk_gb_used += cap_disk.get_capacity(instance)
vcpus_used += instance.vcpus
memory_mb_used += instance.memory
disk_gb_used += instance.disk
return vcpus_used, memory_mb_used, disk_gb_used
@@ -200,18 +199,10 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
def filter_destination_hosts(self, hosts, instance_to_migrate,
avg_workload, workload_cache):
'''Only return hosts with sufficient available resources'''
cap_cores = self.compute_model.get_resource_by_uuid(
element.ResourceType.cpu_cores)
cap_disk = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk)
cap_mem = self.compute_model.get_resource_by_uuid(
element.ResourceType.memory)
required_cores = cap_cores.get_capacity(instance_to_migrate)
required_disk = cap_disk.get_capacity(instance_to_migrate)
required_mem = cap_mem.get_capacity(instance_to_migrate)
"""Only return hosts with sufficient available resources"""
required_cores = instance_to_migrate.vcpus
required_disk = instance_to_migrate.disk
required_mem = instance_to_migrate.memory
# filter nodes without enough resource
destination_hosts = []
@@ -221,16 +212,16 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
workload = instance_data['workload']
# calculate the available resources
cores_used, mem_used, disk_used = self.calculate_used_resource(
host, cap_cores, cap_mem, cap_disk)
cores_available = cap_cores.get_capacity(host) - cores_used
disk_available = cap_disk.get_capacity(host) - disk_used
mem_available = cap_mem.get_capacity(host) - mem_used
host)
cores_available = host.vcpus - cores_used
disk_available = host.disk - disk_used
mem_available = host.memory - mem_used
if (
cores_available >= required_cores and
disk_available >= required_disk and
mem_available >= required_mem and
(src_instance_workload + workload) < self.threshold / 100 *
cap_cores.get_capacity(host)
((src_instance_workload + workload) <
self.threshold / 100 * host.vcpus)
):
destination_hosts.append(instance_data)
@@ -249,9 +240,6 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
cluster_size = len(nodes)
if not nodes:
raise wexc.ClusterEmpty()
# get cpu cores capacity of nodes and instances
cap_cores = self.compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
overload_hosts = []
nonoverload_hosts = []
# total workload of cluster
@@ -259,8 +247,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
# use workload_cache to store the workload of VMs for reuse purpose
workload_cache = {}
for node_id in nodes:
node = self.compute_model.get_node_by_uuid(
node_id)
node = self.compute_model.get_node_by_uuid(node_id)
instances = self.compute_model.get_node_instances(node)
node_workload = 0.0
for instance in instances:
@@ -277,19 +264,17 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
if cpu_util is None:
LOG.debug("Instance (%s): cpu_util is None", instance.uuid)
continue
instance_cores = cap_cores.get_capacity(instance)
workload_cache[instance.uuid] = cpu_util * instance_cores / 100
workload_cache[instance.uuid] = cpu_util * instance.vcpus / 100
node_workload += workload_cache[instance.uuid]
LOG.debug("VM (%s): cpu_util %f", instance.uuid, cpu_util)
node_cores = cap_cores.get_capacity(node)
hy_cpu_util = node_workload / node_cores * 100
node_cpu_util = node_workload / node.vcpus * 100
cluster_workload += node_workload
instance_data = {
'node': node, "cpu_util": hy_cpu_util,
'node': node, "cpu_util": node_cpu_util,
'workload': node_workload}
if hy_cpu_util >= self.threshold:
if node_cpu_util >= self.threshold:
# mark the node to release resources
overload_hosts.append(instance_data)
else:

View File

@@ -185,20 +185,17 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
(instance_load['vcpus'] / float(host_vcpus)))
@MEMOIZE
def get_instance_load(self, instance_uuid):
def get_instance_load(self, instance):
"""Gathering instance load through ceilometer statistic.
:param instance_uuid: instance for which statistic is gathered.
:param instance: instance for which statistic is gathered.
:return: dict
"""
LOG.debug('get_instance_load started')
instance_vcpus = self.compute_model.get_resource_by_uuid(
element.ResourceType.cpu_cores).get_capacity(
self.compute_model.get_instance_by_uuid(instance_uuid))
instance_load = {'uuid': instance_uuid, 'vcpus': instance_vcpus}
instance_load = {'uuid': instance.uuid, 'vcpus': instance.vcpus}
for meter in self.metrics:
avg_meter = self.ceilometer.statistic_aggregation(
resource_id=instance_uuid,
resource_id=instance.uuid,
meter_name=meter,
period=self.periods['instance'],
aggregate='min'
@@ -207,8 +204,8 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
LOG.warning(
_LW("No values returned by %(resource_id)s "
"for %(metric_name)s") % dict(
resource_id=instance_uuid,
metric_name=meter))
resource_id=instance.uuid,
metric_name=meter))
avg_meter = 0
if meter == 'cpu_util':
avg_meter /= float(100)
@@ -219,10 +216,8 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
normalized_hosts = copy.deepcopy(hosts)
for host in normalized_hosts:
if 'memory.resident' in normalized_hosts[host]:
h_memory = self.compute_model.get_resource_by_uuid(
element.ResourceType.memory).get_capacity(
self.compute_model.get_node_by_uuid(host))
normalized_hosts[host]['memory.resident'] /= float(h_memory)
node = self.compute_model.get_node_by_uuid(host)
normalized_hosts[host]['memory.resident'] /= float(node.memory)
return normalized_hosts
@@ -237,13 +232,9 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
hosts_load = {}
for node_id, node in self.get_available_nodes().items():
hosts_load[node_id] = {}
host_vcpus = self.compute_model.get_resource_by_uuid(
element.ResourceType.cpu_cores).get_capacity(
self.compute_model.get_node_by_uuid(node_id))
hosts_load[node_id]['vcpus'] = host_vcpus
hosts_load[node_id]['vcpus'] = node.vcpus
for metric in self.metrics:
resource_id = ''
meter_name = self.instance_metrics[metric]
if re.match('^compute.node', meter_name) is not None:
@@ -294,34 +285,31 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
" for %s in weight dict.") % metric)
return weighted_sd
def calculate_migration_case(self, hosts, instance_id,
src_node_id, dst_node_id):
def calculate_migration_case(self, hosts, instance, src_node, dst_node):
"""Calculate migration case
Return list of standard deviation values, that appearing in case of
migration of instance from source host to destination host
:param hosts: hosts with their workload
:param instance_id: the virtual machine
:param src_node_id: the source node id
:param dst_node_id: the destination node id
:param instance: the virtual machine
:param src_node: the source node
:param dst_node: the destination node
:return: list of standard deviation values
"""
migration_case = []
new_hosts = copy.deepcopy(hosts)
instance_load = self.get_instance_load(instance_id)
d_host_vcpus = new_hosts[dst_node_id]['vcpus']
s_host_vcpus = new_hosts[src_node_id]['vcpus']
instance_load = self.get_instance_load(instance)
s_host_vcpus = new_hosts[src_node.uuid]['vcpus']
d_host_vcpus = new_hosts[dst_node.uuid]['vcpus']
for metric in self.metrics:
if metric is 'cpu_util':
new_hosts[src_node_id][metric] -= self.transform_instance_cpu(
instance_load,
s_host_vcpus)
new_hosts[dst_node_id][metric] += self.transform_instance_cpu(
instance_load,
d_host_vcpus)
new_hosts[src_node.uuid][metric] -= (
self.transform_instance_cpu(instance_load, s_host_vcpus))
new_hosts[dst_node.uuid][metric] += (
self.transform_instance_cpu(instance_load, d_host_vcpus))
else:
new_hosts[src_node_id][metric] -= instance_load[metric]
new_hosts[dst_node_id][metric] += instance_load[metric]
new_hosts[src_node.uuid][metric] -= instance_load[metric]
new_hosts[dst_node.uuid][metric] += instance_load[metric]
normalized_hosts = self.normalize_hosts_load(new_hosts)
for metric in self.metrics:
migration_case.append(self.get_sd(normalized_hosts, metric))
@@ -343,26 +331,27 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
instance_host_map = []
nodes = list(self.get_available_nodes())
for src_node_id in nodes:
src_node = self.compute_model.get_node_by_uuid(src_node_id)
for src_host in nodes:
src_node = self.compute_model.get_node_by_uuid(src_host)
c_nodes = copy.copy(nodes)
c_nodes.remove(src_node_id)
c_nodes.remove(src_host)
node_list = yield_nodes(c_nodes)
for instance in self.compute_model.get_node_instances(src_node):
min_sd_case = {'value': len(self.metrics)}
if instance.state not in [element.InstanceState.ACTIVE.value,
element.InstanceState.PAUSED.value]:
continue
for dst_node_id in next(node_list):
for dst_host in next(node_list):
dst_node = self.compute_model.get_node_by_uuid(dst_host)
sd_case = self.calculate_migration_case(
hosts, instance.uuid, src_node_id, dst_node_id)
hosts, instance, src_node, dst_node)
weighted_sd = self.calculate_weighted_sd(sd_case[:-1])
if weighted_sd < min_sd_case['value']:
min_sd_case = {
'host': dst_node_id, 'value': weighted_sd,
's_host': src_node_id, 'instance': instance.uuid}
'host': dst_node.uuid, 'value': weighted_sd,
's_host': src_node.uuid, 'instance': instance.uuid}
instance_host_map.append(min_sd_case)
return sorted(instance_host_map, key=lambda x: x['value'])
@@ -433,19 +422,16 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
min_sd = 1
balanced = False
for instance_host in migration:
dst_hp_disk = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk).get_capacity(
self.compute_model.get_node_by_uuid(
instance_host['host']))
instance_disk = self.compute_model.get_resource_by_uuid(
element.ResourceType.disk).get_capacity(
self.compute_model.get_instance_by_uuid(
instance_host['instance']))
if instance_disk > dst_hp_disk:
instance = self.compute_model.get_instance_by_uuid(
instance_host['instance'])
src_node = self.compute_model.get_node_by_uuid(
instance_host['s_host'])
dst_node = self.compute_model.get_node_by_uuid(
instance_host['host'])
if instance.disk > dst_node.disk:
continue
instance_load = self.calculate_migration_case(
hosts_load, instance_host['instance'],
instance_host['s_host'], instance_host['host'])
hosts_load, instance, src_node, dst_node)
weighted_sd = self.calculate_weighted_sd(instance_load[:-1])
if weighted_sd < min_sd:
min_sd = weighted_sd

View File

@@ -22,7 +22,6 @@ import os
import mock
from watcher.decision_engine.model.collector import base
from watcher.decision_engine.model import element
from watcher.decision_engine.model import model_root as modelroot
@@ -106,16 +105,12 @@ class FakeCeilometerMetrics(object):
node = self.model.get_node_by_uuid(node_uuid)
instances = self.model.get_node_instances(node)
util_sum = 0.0
node_cpu_cores = self.model.get_resource_by_uuid(
element.ResourceType.cpu_cores).get_capacity_by_uuid(node.uuid)
for instance_uuid in instances:
instance_cpu_cores = self.model.get_resource_by_uuid(
element.ResourceType.cpu_cores).\
get_capacity(self.model.get_instance_by_uuid(instance_uuid))
total_cpu_util = instance_cpu_cores * self.get_instance_cpu_util(
instance_uuid)
instance = self.model.get_instance_by_uuid(instance_uuid)
total_cpu_util = instance.vcpus * self.get_instance_cpu_util(
instance.uuid)
util_sum += total_cpu_util / 100.0
util_sum /= node_cpu_cores
util_sum /= node.vcpus
return util_sum * 100.0
@staticmethod

View File

@@ -217,26 +217,18 @@ class TestNovaNotifications(NotificationTestCase):
)
instance0 = compute_model.get_instance_by_uuid(instance0_uuid)
cpu_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk = compute_model.get_resource_by_uuid(
element.ResourceType.disk)
disk_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.disk_capacity)
memory_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.memory)
self.assertEqual(element.InstanceState.PAUSED.value, instance0.state)
self.assertEqual(1, cpu_capacity.get_capacity(instance0))
self.assertEqual(1, disk_capacity.get_capacity(instance0))
self.assertEqual(512, memory_capacity.get_capacity(instance0))
self.assertEqual(1, instance0.vcpus)
self.assertEqual(1, instance0.disk_capacity)
self.assertEqual(512, instance0.memory)
m_get_compute_node_by_hostname.assert_called_once_with('Node_2')
node_2 = compute_model.get_node_by_uuid('Node_2')
self.assertEqual(7777, memory_capacity.get_capacity(node_2))
self.assertEqual(42, cpu_capacity.get_capacity(node_2))
self.assertEqual(974, disk.get_capacity(node_2))
self.assertEqual(1337, disk_capacity.get_capacity(node_2))
self.assertEqual(7777, node_2.memory)
self.assertEqual(42, node_2.vcpus)
self.assertEqual(974, node_2.disk)
self.assertEqual(1337, node_2.disk_capacity)
@mock.patch.object(nova_helper, "NovaHelper")
def test_instance_update_node_notfound_set_unmapped(
@@ -265,20 +257,12 @@ class TestNovaNotifications(NotificationTestCase):
)
instance0 = compute_model.get_instance_by_uuid(instance0_uuid)
cpu_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk = compute_model.get_resource_by_uuid(
element.ResourceType.disk)
disk_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.disk_capacity)
memory_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.memory)
self.assertEqual(element.InstanceState.PAUSED.value, instance0.state)
self.assertEqual(1, cpu_capacity.get_capacity(instance0))
self.assertEqual(1, disk.get_capacity(instance0))
self.assertEqual(1, disk_capacity.get_capacity(instance0))
self.assertEqual(512, memory_capacity.get_capacity(instance0))
self.assertEqual(1, instance0.vcpus)
self.assertEqual(1, instance0.disk)
self.assertEqual(1, instance0.disk_capacity)
self.assertEqual(512, instance0.memory)
m_get_compute_node_by_hostname.assert_any_call('Node_2')
self.assertRaises(
@@ -306,17 +290,11 @@ class TestNovaNotifications(NotificationTestCase):
)
instance0 = compute_model.get_instance_by_uuid(instance0_uuid)
cpu_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.disk)
memory_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.memory)
self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state)
self.assertEqual(1, cpu_capacity.get_capacity(instance0))
self.assertEqual(1, disk_capacity.get_capacity(instance0))
self.assertEqual(512, memory_capacity.get_capacity(instance0))
self.assertEqual(1, instance0.vcpus)
self.assertEqual(1, instance0.disk_capacity)
self.assertEqual(512, instance0.memory)
def test_nova_instance_delete_end(self):
compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes()
@@ -374,17 +352,11 @@ class TestLegacyNovaNotifications(NotificationTestCase):
)
instance0 = compute_model.get_instance_by_uuid(instance0_uuid)
cpu_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.disk)
memory_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.memory)
self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state)
self.assertEqual(1, cpu_capacity.get_capacity(instance0))
self.assertEqual(1, disk_capacity.get_capacity(instance0))
self.assertEqual(512, memory_capacity.get_capacity(instance0))
self.assertEqual(1, instance0.vcpus)
self.assertEqual(1, instance0.disk_capacity)
self.assertEqual(512, instance0.memory)
def test_legacy_instance_updated(self):
compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes()
@@ -445,27 +417,19 @@ class TestLegacyNovaNotifications(NotificationTestCase):
)
instance0 = compute_model.get_instance_by_uuid(instance0_uuid)
cpu_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk = compute_model.get_resource_by_uuid(
element.ResourceType.disk)
disk_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.disk_capacity)
memory_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.memory)
self.assertEqual(element.InstanceState.PAUSED.value, instance0.state)
self.assertEqual(1, cpu_capacity.get_capacity(instance0))
self.assertEqual(1, disk.get_capacity(instance0))
self.assertEqual(1, disk_capacity.get_capacity(instance0))
self.assertEqual(512, memory_capacity.get_capacity(instance0))
self.assertEqual(1, instance0.vcpus)
self.assertEqual(1, instance0.disk)
self.assertEqual(1, instance0.disk_capacity)
self.assertEqual(512, instance0.memory)
m_get_compute_node_by_hostname.assert_any_call('Node_2')
node_2 = compute_model.get_node_by_uuid('Node_2')
self.assertEqual(7777, memory_capacity.get_capacity(node_2))
self.assertEqual(42, cpu_capacity.get_capacity(node_2))
self.assertEqual(974, disk.get_capacity(node_2))
self.assertEqual(1337, disk_capacity.get_capacity(node_2))
self.assertEqual(7777, node_2.memory)
self.assertEqual(42, node_2.vcpus)
self.assertEqual(974, node_2.disk)
self.assertEqual(1337, node_2.disk_capacity)
@mock.patch.object(nova_helper, "NovaHelper")
def test_legacy_instance_update_node_notfound_set_unmapped(
@@ -494,20 +458,12 @@ class TestLegacyNovaNotifications(NotificationTestCase):
)
instance0 = compute_model.get_instance_by_uuid(instance0_uuid)
cpu_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.vcpus)
disk = compute_model.get_resource_by_uuid(
element.ResourceType.disk)
disk_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.disk_capacity)
memory_capacity = compute_model.get_resource_by_uuid(
element.ResourceType.memory)
self.assertEqual(element.InstanceState.PAUSED.value, instance0.state)
self.assertEqual(1, cpu_capacity.get_capacity(instance0))
self.assertEqual(1, disk.get_capacity(instance0))
self.assertEqual(1, disk_capacity.get_capacity(instance0))
self.assertEqual(512, memory_capacity.get_capacity(instance0))
self.assertEqual(1, instance0.vcpus)
self.assertEqual(1, instance0.disk)
self.assertEqual(1, instance0.disk_capacity)
self.assertEqual(512, instance0.memory)
m_get_compute_node_by_hostname.assert_any_call('Node_2')
self.assertRaises(

View File

@@ -22,7 +22,6 @@ import mock
from watcher.applier.loading import default
from watcher.common import exception
from watcher.common import utils
from watcher.decision_engine.model import element
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
@@ -69,15 +68,12 @@ class TestOutletTempControl(base.TestCase):
self.strategy.input_parameters.update({'threshold': 34.3})
self.strategy.threshold = 34.3
def test_calc_used_res(self):
def test_calc_used_resource(self):
model = self.fake_cluster.generate_scenario_3_with_2_nodes()
self.m_model.return_value = model
node = model.get_node_by_uuid('Node_0')
cap_cores = model.get_resource_by_uuid(element.ResourceType.cpu_cores)
cap_mem = model.get_resource_by_uuid(element.ResourceType.memory)
cap_disk = model.get_resource_by_uuid(element.ResourceType.disk)
cores_used, mem_used, disk_used = self.strategy.calc_used_res(
node, cap_cores, cap_mem, cap_disk)
cores_used, mem_used, disk_used = self.strategy.calc_used_resource(
node)
self.assertEqual((10, 2, 20), (cores_used, mem_used, disk_used))

View File

@@ -22,7 +22,6 @@ import mock
from watcher.applier.loading import default
from watcher.common import exception
from watcher.common import utils
from watcher.decision_engine.model import element
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
@@ -74,16 +73,12 @@ class TestUniformAirflow(base.TestCase):
self.strategy.threshold_power = 350
self._period = 300
def test_calc_used_res(self):
def test_calc_used_resource(self):
model = self.fake_cluster.generate_scenario_7_with_2_nodes()
self.m_model.return_value = model
node = model.get_node_by_uuid('Node_0')
cap_cores = model.get_resource_by_uuid(element.ResourceType.cpu_cores)
cap_mem = model.get_resource_by_uuid(element.ResourceType.memory)
cap_disk = model.get_resource_by_uuid(element.ResourceType.disk)
cores_used, mem_used, disk_used = self.\
strategy.calculate_used_resource(
node, cap_cores, cap_mem, cap_disk)
cores_used, mem_used, disk_used = (
self.strategy.calculate_used_resource(node))
self.assertEqual((cores_used, mem_used, disk_used), (25, 4, 40))
def test_group_hosts_by_airflow(self):

View File

@@ -81,7 +81,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
instance_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(
instance_util,
self.strategy.get_instance_utilization(instance_0.uuid, model))
self.strategy.get_instance_utilization(instance_0, model))
def test_get_node_utilization(self):
model = self.fake_cluster.generate_scenario_1()
@@ -99,16 +99,14 @@ class TestVMWorkloadConsolidation(base.TestCase):
self.fake_metrics.model = model
node_0 = model.get_node_by_uuid("Node_0")
node_util = dict(cpu=40, ram=64, disk=250)
self.assertEqual(node_util,
self.strategy.get_node_capacity(node_0, model))
self.assertEqual(node_util, self.strategy.get_node_capacity(node_0))
def test_get_relative_node_utilization(self):
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
self.fake_metrics.model = model
node = model.get_node_by_uuid('Node_0')
rhu = self.strategy.get_relative_node_utilization(
node, model)
rhu = self.strategy.get_relative_node_utilization(node)
expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025}
self.assertEqual(expected_rhu, rhu)
@@ -116,7 +114,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
self.fake_metrics.model = model
cru = self.strategy.get_relative_cluster_utilization(model)
cru = self.strategy.get_relative_cluster_utilization()
expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375}
self.assertEqual(expected_cru, cru)
@@ -128,7 +126,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
n2 = model.get_node_by_uuid('Node_1')
instance_uuid = 'INSTANCE_0'
instance = model.get_instance_by_uuid(instance_uuid)
self.strategy.add_migration(instance, n1, n2, model)
self.strategy.add_migration(instance, n1, n2)
self.assertEqual(1, len(self.strategy.solution.actions))
expected = {'action_type': 'migrate',
'input_parameters': {'destination_node': n2.uuid,
@@ -143,15 +141,15 @@ class TestVMWorkloadConsolidation(base.TestCase):
self.fake_metrics.model = model
n1 = model.get_node_by_uuid('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = self.strategy.is_overloaded(n1, model, cc)
res = self.strategy.is_overloaded(n1, cc)
self.assertFalse(res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = self.strategy.is_overloaded(n1, model, cc)
res = self.strategy.is_overloaded(n1, cc)
self.assertFalse(res)
cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0}
res = self.strategy.is_overloaded(n1, model, cc)
res = self.strategy.is_overloaded(n1, cc)
self.assertTrue(res)
def test_instance_fits(self):
@@ -159,13 +157,13 @@ class TestVMWorkloadConsolidation(base.TestCase):
self.m_model.return_value = model
self.fake_metrics.model = model
n = model.get_node_by_uuid('Node_1')
instance_uuid = 'INSTANCE_0'
instance0 = model.get_instance_by_uuid('INSTANCE_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = self.strategy.instance_fits(instance_uuid, n, model, cc)
res = self.strategy.instance_fits(instance0, n, cc)
self.assertTrue(res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = self.strategy.instance_fits(instance_uuid, n, model, cc)
res = self.strategy.instance_fits(instance0, n, cc)
self.assertFalse(res)
def test_add_action_enable_compute_node(self):
@@ -198,13 +196,13 @@ class TestVMWorkloadConsolidation(base.TestCase):
n2 = model.get_node_by_uuid('Node_1')
instance_uuid = 'INSTANCE_0'
instance = model.get_instance_by_uuid(instance_uuid)
self.strategy.disable_unused_nodes(model)
self.strategy.disable_unused_nodes()
self.assertEqual(0, len(self.strategy.solution.actions))
# Migrate VM to free the node
self.strategy.add_migration(instance, n1, n2, model)
self.strategy.add_migration(instance, n1, n2)
self.strategy.disable_unused_nodes(model)
self.strategy.disable_unused_nodes()
expected = {'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'disabled',
'resource_id': 'Node_0'}}
@@ -216,7 +214,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
self.m_model.return_value = model
self.fake_metrics.model = model
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
self.strategy.offload_phase(model, cc)
self.strategy.offload_phase(cc)
expected = []
self.assertEqual(expected, self.strategy.solution.actions)
@@ -228,7 +226,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
n2 = model.get_node_by_uuid('Node_1')
instance_uuid = 'INSTANCE_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
self.strategy.consolidation_phase(model, cc)
self.strategy.consolidation_phase(cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'destination_node': n2.uuid,
'source_node': n1.uuid,
@@ -242,9 +240,9 @@ class TestVMWorkloadConsolidation(base.TestCase):
self.fake_metrics.model = model
n1 = model.get_node_by_uuid('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
self.strategy.offload_phase(model, cc)
self.strategy.consolidation_phase(model, cc)
self.strategy.optimize_solution(model)
self.strategy.offload_phase(cc)
self.strategy.consolidation_phase(cc)
self.strategy.optimize_solution()
n2 = self.strategy.solution.actions[0][
'input_parameters']['destination_node']
expected = [{'action_type': 'migrate',
@@ -267,7 +265,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
n1 = model.get_node_by_uuid('Node_0')
n2 = model.get_node_by_uuid('Node_1')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
self.strategy.offload_phase(model, cc)
self.strategy.offload_phase(cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'destination_node': n2.uuid,
'migration_type': 'live',
@@ -284,14 +282,14 @@ class TestVMWorkloadConsolidation(base.TestCase):
'resource_id': 'INSTANCE_8',
'source_node': n1.uuid}}]
self.assertEqual(expected, self.strategy.solution.actions)
self.strategy.consolidation_phase(model, cc)
self.strategy.consolidation_phase(cc)
expected.append({'action_type': 'migrate',
'input_parameters': {'destination_node': n1.uuid,
'migration_type': 'live',
'resource_id': 'INSTANCE_7',
'source_node': n2.uuid}})
self.assertEqual(expected, self.strategy.solution.actions)
self.strategy.optimize_solution(model)
self.strategy.optimize_solution()
del expected[3]
del expected[1]
self.assertEqual(expected, self.strategy.solution.actions)

View File

@@ -22,7 +22,6 @@ import mock
from watcher.applier.loading import default
from watcher.common import exception
from watcher.common import utils
from watcher.decision_engine.model import element
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
@@ -70,16 +69,12 @@ class TestWorkloadBalance(base.TestCase):
self.strategy.threshold = 25.0
self.strategy._period = 300
def test_calc_used_res(self):
def test_calc_used_resource(self):
model = self.fake_cluster.generate_scenario_6_with_2_nodes()
self.m_model.return_value = model
node = model.get_node_by_uuid('Node_0')
cap_cores = model.get_resource_by_uuid(element.ResourceType.vcpus)
cap_mem = model.get_resource_by_uuid(element.ResourceType.memory)
cap_disk = model.get_resource_by_uuid(element.ResourceType.disk)
cores_used, mem_used, disk_used = (
self.strategy.calculate_used_resource(
node, cap_cores, cap_mem, cap_disk))
self.strategy.calculate_used_resource(node))
self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40))

View File

@@ -93,25 +93,29 @@ class TestWorkloadStabilization(base.TestCase):
self.strategy.periods = {"instance": 720, "node": 600}
def test_get_instance_load(self):
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
instance0 = model.get_instance_by_uuid("INSTANCE_0")
instance_0_dict = {
'uuid': 'INSTANCE_0', 'vcpus': 10,
'cpu_util': 0.07, 'memory.resident': 2}
self.assertEqual(
instance_0_dict, self.strategy.get_instance_load("INSTANCE_0"))
instance_0_dict, self.strategy.get_instance_load(instance0))
def test_periods(self):
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
p_ceilometer = mock.patch.object(
strategies.WorkloadStabilization, "ceilometer")
m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy.get_instance_load("INSTANCE_0")
instance0 = model.get_instance_by_uuid("INSTANCE_0")
self.strategy.get_instance_load(instance0)
m_ceilometer.statistic_aggregation.assert_called_with(
aggregate='min', meter_name='memory.resident',
period=720, resource_id='INSTANCE_0')
period=720, resource_id=instance0.uuid)
self.strategy.get_hosts_load()
m_ceilometer.statistic_aggregation.assert_called_with(
aggregate='avg', meter_name='hardware.memory.used',
@@ -158,10 +162,14 @@ class TestWorkloadStabilization(base.TestCase):
self.assertEqual(self.strategy.calculate_weighted_sd(sd_case), 1.25)
def test_calculate_migration_case(self):
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
instance = model.get_instance_by_uuid("INSTANCE_5")
src_node = model.get_node_by_uuid("Node_2")
dst_node = model.get_node_by_uuid("Node_1")
result = self.strategy.calculate_migration_case(
self.hosts_load_assert, "INSTANCE_5", "Node_2", "Node_1")[-1][
"Node_1"]
self.hosts_load_assert, instance,
src_node, dst_node)[-1][dst_node.uuid]
result['cpu_util'] = round(result['cpu_util'], 3)
self.assertEqual(result, {'cpu_util': 0.095, 'memory.resident': 21.0,
'vcpus': 40})

View File

@@ -30,7 +30,7 @@ CONF = config.CONF
class TestExecuteBasicStrategy(base.BaseInfraOptimScenarioTest):
"""Tests for action plans"""
BASIC_GOAL = "server_consolidation"
GOAL_NAME = "server_consolidation"
@classmethod
def skip_checks(cls):
@@ -117,12 +117,15 @@ class TestExecuteBasicStrategy(base.BaseInfraOptimScenarioTest):
all_hosts = host_client.list_hosts()['hosts']
compute_nodes = [x for x in all_hosts if x['service'] == 'compute']
for _ in compute_nodes[:CONF.compute.min_compute_nodes]:
for idx, _ in enumerate(
compute_nodes[:CONF.compute.min_compute_nodes], start=1):
# by getting to active state here, this means this has
# landed on the host in question.
self.create_server(image_id=CONF.compute.image_ref,
wait_until='ACTIVE',
clients=self.mgr)
self.create_server(
name="instance-%d" % idx,
image_id=CONF.compute.image_ref,
wait_until='ACTIVE',
clients=self.mgr)
def test_execute_basic_action_plan(self):
"""Execute an action plan based on the BASIC strategy
@@ -136,7 +139,7 @@ class TestExecuteBasicStrategy(base.BaseInfraOptimScenarioTest):
self.addCleanup(self.rollback_compute_nodes_status)
self._create_one_instance_per_host()
_, goal = self.client.show_goal(self.BASIC_GOAL)
_, goal = self.client.show_goal(self.GOAL_NAME)
_, strategy = self.client.show_strategy("basic")
_, audit_template = self.create_audit_template(
goal['uuid'], strategy=strategy['uuid'])