diff --git a/watcher/decision_engine/model/collector/base.py b/watcher/decision_engine/model/collector/base.py index dd696701c..b251a0caa 100644 --- a/watcher/decision_engine/model/collector/base.py +++ b/watcher/decision_engine/model/collector/base.py @@ -108,12 +108,15 @@ import copy import threading from oslo_config import cfg +from oslo_log import log import six from watcher.common import clients from watcher.common.loader import loadable from watcher.decision_engine.model import model_root +LOG = log.getLogger(__name__) + @six.add_metaclass(abc.ABCMeta) class BaseClusterDataModelCollector(loadable.LoadableSingleton): @@ -169,6 +172,8 @@ class BaseClusterDataModelCollector(loadable.LoadableSingleton): ] def get_latest_cluster_data_model(self): + LOG.debug("Creating copy") + LOG.debug(self.cluster_data_model.to_xml()) return copy.deepcopy(self.cluster_data_model) def synchronize(self): diff --git a/watcher/decision_engine/model/collector/nova.py b/watcher/decision_engine/model/collector/nova.py index 1b8559497..ef6aaac17 100644 --- a/watcher/decision_engine/model/collector/nova.py +++ b/watcher/decision_engine/model/collector/nova.py @@ -132,7 +132,6 @@ class ModelBuilder(object): compute_service = self.nova_helper.get_service(node.service["id"]) node_attributes = { "id": node.id, - "human_id": None, # TODO(v-francoise): get rid of it "uuid": compute_service.host, "hostname": node.hypervisor_hostname, "memory": node.memory_mb, diff --git a/watcher/decision_engine/model/element/__init__.py b/watcher/decision_engine/model/element/__init__.py index 51ec0302a..69cd050ec 100644 --- a/watcher/decision_engine/model/element/__init__.py +++ b/watcher/decision_engine/model/element/__init__.py @@ -16,10 +16,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from watcher.decision_engine.model.element import disk_info from watcher.decision_engine.model.element import instance from watcher.decision_engine.model.element import node -from watcher.decision_engine.model.element import resource ServiceState = node.ServiceState ComputeNode = node.ComputeNode @@ -27,12 +25,4 @@ ComputeNode = node.ComputeNode InstanceState = instance.InstanceState Instance = instance.Instance -DiskInfo = disk_info.DiskInfo - -ResourceType = resource.ResourceType -Resource = resource.Resource - - -__all__ = [ - 'ServiceState', 'ComputeNode', 'InstanceState', 'Instance', - 'DiskInfo', 'ResourceType', 'Resource'] +__all__ = ['ServiceState', 'ComputeNode', 'InstanceState', 'Instance'] diff --git a/watcher/decision_engine/model/element/disk_info.py b/watcher/decision_engine/model/element/disk_info.py deleted file mode 100644 index e408aee88..000000000 --- a/watcher/decision_engine/model/element/disk_info.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.model.element import base - - -class DiskInfo(base.Element): - - def __init__(self): - self.name = "" - self.major = 0 - self.minor = 0 - self.size = 0 - self.scheduler = "" - - def accept(self, visitor): - raise NotImplementedError() - - def set_size(self, size): - """DiskInfo - - :param size: Size in bytes - """ - self.size = size - - def get_size(self): - return self.size - - def set_scheduler(self, scheduler): - """DiskInfo - - I/O Scheduler noop cfq deadline - :param scheduler: - :return: - """ - self.scheduler = scheduler - - def set_device_name(self, name): - """Device name - - :param name: - """ - self.name = name - - def get_device_name(self): - return self.name diff --git a/watcher/decision_engine/model/element/resource.py b/watcher/decision_engine/model/element/resource.py deleted file mode 100644 index 720213ae5..000000000 --- a/watcher/decision_engine/model/element/resource.py +++ /dev/null @@ -1,64 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import enum - -from watcher.common import exception - - -class ResourceType(enum.Enum): - cpu_cores = 'vcpus' - vcpus = 'vcpus' - memory = 'memory' - disk = 'disk' - disk_capacity = 'disk_capacity' - - -class Resource(object): - def __init__(self, name, capacity=None): - """Resource - - :param name: ResourceType - :param capacity: max - :return: - """ - self._name = name - self.capacity = capacity - self.mapping = {} - - @property - def name(self): - return self._name - - @name.setter - def name(self, n): - self._name = n - - def set_capacity(self, element, value): - self.mapping[element.uuid] = value - - def unset_capacity(self, element): - del self.mapping[element.uuid] - - def get_capacity_by_uuid(self, uuid): - try: - return self.mapping[str(uuid)] - except KeyError: - raise exception.CapacityNotDefined( - capacity=self.name.value, resource=str(uuid)) - - def get_capacity(self, element): - return self.get_capacity_by_uuid(element.uuid) diff --git a/watcher/decision_engine/model/model_root.py b/watcher/decision_engine/model/model_root.py index 6c1c1d6a4..f82d0360a 100644 --- a/watcher/decision_engine/model/model_root.py +++ b/watcher/decision_engine/model/model_root.py @@ -172,22 +172,6 @@ class ModelRoot(nx.DiGraph, base.Model): return {inst.uuid: inst for inst in self.nodes() if isinstance(inst, element.Instance)} - @lockutils.synchronized("model_root") - def get_resource_by_uuid(self, resource_id): - # TODO(v-francoise): deprecate this method - # This is a trick to keep the compatibility with the old model root - class Resource(object): - def __init__(self, resource_id): - if isinstance(resource_id, element.ResourceType): - resource_id = resource_id.value - self.resource_id = resource_id - - def get_capacity(self, element): - # We ignore element because value already contains the value - return getattr(element, self.resource_id) - - return Resource(resource_id) - @lockutils.synchronized("model_root") def get_node_instances(self, node): self.assert_node(node) diff --git a/watcher/decision_engine/model/notification/nova.py b/watcher/decision_engine/model/notification/nova.py index 976e3fa4d..8e450f790 100644 --- a/watcher/decision_engine/model/notification/nova.py +++ b/watcher/decision_engine/model/notification/nova.py @@ -64,23 +64,19 @@ class NovaNotification(base.NotificationEndpoint): instance_data = data['nova_object.data'] instance_flavor_data = instance_data['flavor']['nova_object.data'] - instance.update({ - 'state': instance_data['state'], - 'hostname': instance_data['host_name'], - 'human_id': instance_data['display_name'], - }) - memory_mb = instance_flavor_data['memory_mb'] num_cores = instance_flavor_data['vcpus'] disk_gb = instance_flavor_data['root_gb'] - self.update_capacity(element.ResourceType.memory, instance, memory_mb) - self.update_capacity( - element.ResourceType.vcpus, instance, num_cores) - self.update_capacity( - element.ResourceType.disk, instance, disk_gb) - self.update_capacity( - element.ResourceType.disk_capacity, instance, disk_gb) + instance.update({ + 'state': instance_data['state'], + 'hostname': instance_data['host_name'], + 'human_id': instance_data['display_name'], + 'memory': memory_mb, + 'vcpus': num_cores, + 'disk': disk_gb, + 'disk_capacity': disk_gb, + }) try: node = self.get_or_create_node(instance_data['host']) @@ -91,27 +87,20 @@ class NovaNotification(base.NotificationEndpoint): self.update_instance_mapping(instance, node) - def update_capacity(self, resource_id, obj, value): - setattr(obj, resource_id.value, value) - def legacy_update_instance(self, instance, data): - instance.update({ - 'state': data['state'], - 'hostname': data['hostname'], - 'human_id': data['display_name'], - }) - memory_mb = data['memory_mb'] num_cores = data['vcpus'] disk_gb = data['root_gb'] - self.update_capacity(element.ResourceType.memory, instance, memory_mb) - self.update_capacity( - element.ResourceType.vcpus, instance, num_cores) - self.update_capacity( - element.ResourceType.disk, instance, disk_gb) - self.update_capacity( - element.ResourceType.disk_capacity, instance, disk_gb) + instance.update({ + 'state': data['state'], + 'hostname': data['hostname'], + 'human_id': data['display_name'], + 'memory': memory_mb, + 'vcpus': num_cores, + 'disk': disk_gb, + 'disk_capacity': disk_gb, + }) try: node = self.get_or_create_node(data['host']) @@ -147,16 +136,12 @@ class NovaNotification(base.NotificationEndpoint): uuid=node_hostname, hostname=_node.hypervisor_hostname, state=_node.state, - status=_node.status) - - self.update_capacity( - element.ResourceType.memory, node, _node.memory_mb) - self.update_capacity( - element.ResourceType.vcpus, node, _node.vcpus) - self.update_capacity( - element.ResourceType.disk, node, _node.free_disk_gb) - self.update_capacity( - element.ResourceType.disk_capacity, node, _node.local_gb) + status=_node.status, + memory=_node.memory_mb, + vcpus=_node.vcpus, + disk=_node.free_disk_gb, + disk_capacity=_node.local_gb, + ) return node except Exception as exc: LOG.exception(exc) @@ -176,6 +161,7 @@ class NovaNotification(base.NotificationEndpoint): node = self.create_compute_node(uuid) LOG.debug("New compute node created: %s", uuid) self.cluster_data_model.add_node(node) + LOG.debug("New compute node mapped: %s", uuid) return node def update_instance_mapping(self, instance, node): diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index 63287ae8f..5be1a9bb2 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -173,23 +173,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): total_cores = 0 total_disk = 0 total_mem = 0 - cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk) - memory_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.memory) - for instance in self.compute_model.get_node_instances( destination_node): - total_cores += cpu_capacity.get_capacity(instance) - total_disk += disk_capacity.get_capacity(instance) - total_mem += memory_capacity.get_capacity(instance) + total_cores += instance.vcpus + total_disk += instance.disk + total_mem += instance.memory # capacity requested by the compute node - total_cores += cpu_capacity.get_capacity(instance_to_migrate) - total_disk += disk_capacity.get_capacity(instance_to_migrate) - total_mem += memory_capacity.get_capacity(instance_to_migrate) + total_cores += instance_to_migrate.vcpus + total_disk += instance_to_migrate.disk + total_mem += instance_to_migrate.memory return self.check_threshold(destination_node, total_cores, total_disk, total_mem) @@ -208,12 +201,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): :param total_mem: total memory used by the virtual machine :return: True if the threshold is not exceed """ - cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.vcpus).get_capacity(destination_node) - disk_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk).get_capacity(destination_node) - memory_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.memory).get_capacity(destination_node) + cpu_capacity = destination_node.vcpus + disk_capacity = destination_node.disk + memory_capacity = destination_node.memory return (cpu_capacity >= total_cores * self.threshold_cores and disk_capacity >= total_disk * self.threshold_disk and @@ -229,14 +219,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): :param total_memory_used: :return: """ - cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.vcpus).get_capacity(compute_resource) - - disk_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk).get_capacity(compute_resource) - - memory_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.memory).get_capacity(compute_resource) + cpu_capacity = compute_resource.vcpus + disk_capacity = compute_resource.disk + memory_capacity = compute_resource.memory score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) / float(cpu_capacity)) @@ -331,10 +316,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): self.config.datasource]['host_cpu_usage'])) host_avg_cpu_util = 100 - cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.vcpus).get_capacity(node) - - total_cores_used = cpu_capacity * (host_avg_cpu_util / 100.0) + total_cores_used = node.vcpus * (host_avg_cpu_util / 100.0) return self.calculate_weight(node, total_cores_used, 0, 0) @@ -354,10 +336,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): self.config.datasource]['instance_cpu_usage'])) instance_cpu_utilization = 100 - cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.vcpus).get_capacity(instance) - - total_cores_used = cpu_capacity * (instance_cpu_utilization / 100.0) + total_cores_used = instance.vcpus * (instance_cpu_utilization / 100.0) return self.calculate_weight(instance, total_cores_used, 0, 0) diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index ed1c17318..c260e462c 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -121,17 +121,16 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def ceilometer(self, c): self._ceilometer = c - def calc_used_res(self, node, cpu_capacity, - memory_capacity, disk_capacity): + def calc_used_resource(self, node): """Calculate the used vcpus, memory and disk based on VM flavors""" instances = self.compute_model.get_node_instances(node) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 for instance in instances: - vcpus_used += cpu_capacity.get_capacity(instance) - memory_mb_used += memory_capacity.get_capacity(instance) - disk_gb_used += disk_capacity.get_capacity(instance) + vcpus_used += instance.vcpus + memory_mb_used += instance.memory + disk_gb_used += instance.disk return vcpus_used, memory_mb_used, disk_gb_used @@ -189,27 +188,19 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def filter_dest_servers(self, hosts, instance_to_migrate): """Only return hosts with sufficient available resources""" - cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk) - memory_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.memory) - - required_cores = cpu_capacity.get_capacity(instance_to_migrate) - required_disk = disk_capacity.get_capacity(instance_to_migrate) - required_memory = memory_capacity.get_capacity(instance_to_migrate) + required_cores = instance_to_migrate.vcpus + required_disk = instance_to_migrate.disk + required_memory = instance_to_migrate.memory # filter nodes without enough resource dest_servers = [] for instance_data in hosts: host = instance_data['node'] # available - cores_used, mem_used, disk_used = self.calc_used_res( - host, cpu_capacity, memory_capacity, disk_capacity) - cores_available = cpu_capacity.get_capacity(host) - cores_used - disk_available = disk_capacity.get_capacity(host) - disk_used - mem_available = memory_capacity.get_capacity(host) - mem_used + cores_used, mem_used, disk_used = self.calc_used_resource(host) + cores_available = host.vcpus - cores_used + disk_available = host.disk - disk_used + mem_available = host.memory - mem_used if cores_available >= required_cores \ and disk_available >= required_disk \ and mem_available >= required_memory: diff --git a/watcher/decision_engine/strategy/strategies/uniform_airflow.py b/watcher/decision_engine/strategy/strategies/uniform_airflow.py index ca98c09e7..8eafbdc2c 100644 --- a/watcher/decision_engine/strategy/strategies/uniform_airflow.py +++ b/watcher/decision_engine/strategy/strategies/uniform_airflow.py @@ -164,16 +164,16 @@ class UniformAirflow(base.BaseStrategy): }, } - def calculate_used_resource(self, node, cap_cores, cap_mem, cap_disk): + def calculate_used_resource(self, node): """Compute the used vcpus, memory and disk based on instance flavors""" instances = self.compute_model.get_node_instances(node) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 for instance in instances: - vcpus_used += cap_cores.get_capacity(instance) - memory_mb_used += cap_mem.get_capacity(instance) - disk_gb_used += cap_disk.get_capacity(instance) + vcpus_used += instance.vcpus + memory_mb_used += instance.memory + disk_gb_used += instance.disk return vcpus_used, memory_mb_used, disk_gb_used @@ -221,23 +221,16 @@ class UniformAirflow(base.BaseStrategy): def filter_destination_hosts(self, hosts, instances_to_migrate): """Find instance and host with sufficient available resources""" - - cap_cores = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) - cap_disk = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk) - cap_mem = self.compute_model.get_resource_by_uuid( - element.ResourceType.memory) - # large instance go first + # large instances go first instances_to_migrate = sorted( instances_to_migrate, reverse=True, - key=lambda x: (cap_cores.get_capacity(x))) + key=lambda x: (x.vcpus)) # find hosts for instances destination_hosts = [] for instance_to_migrate in instances_to_migrate: - required_cores = cap_cores.get_capacity(instance_to_migrate) - required_disk = cap_disk.get_capacity(instance_to_migrate) - required_mem = cap_mem.get_capacity(instance_to_migrate) + required_cores = instance_to_migrate.vcpus + required_disk = instance_to_migrate.disk + required_mem = instance_to_migrate.memory dest_migrate_info = {} for nodemap in hosts: host = nodemap['node'] @@ -245,13 +238,13 @@ class UniformAirflow(base.BaseStrategy): # calculate the available resources nodemap['cores_used'], nodemap['mem_used'],\ nodemap['disk_used'] = self.calculate_used_resource( - host, cap_cores, cap_mem, cap_disk) - cores_available = (cap_cores.get_capacity(host) - + host) + cores_available = (host.vcpus - nodemap['cores_used']) - disk_available = (cap_disk.get_capacity(host) - + disk_available = (host.disk - nodemap['disk_used']) mem_available = ( - cap_mem.get_capacity(host) - nodemap['mem_used']) + host.memory - nodemap['mem_used']) if (cores_available >= required_cores and disk_available >= required_disk and mem_available >= required_mem): diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py index 1f80478cc..9a65134ef 100644 --- a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -139,14 +139,12 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): input_parameters=params) self.number_of_released_nodes += 1 - def add_migration(self, instance, source_node, - destination_node, model): + def add_migration(self, instance, source_node, destination_node): """Add an action for VM migration into the solution. :param instance: instance object :param source_node: node object :param destination_node: node object - :param model: model_root object :return: None """ instance_state_str = self.get_state_str(instance.state) @@ -168,7 +166,8 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): if destination_node_state_str == element.ServiceState.DISABLED.value: self.add_action_enable_compute_node(destination_node) - if model.migrate_instance(instance, source_node, destination_node): + if self.compute_model.migrate_instance( + instance, source_node, destination_node): params = {'migration_type': migration_type, 'source_node': source_node.uuid, 'destination_node': destination_node.uuid} @@ -177,30 +176,28 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): input_parameters=params) self.number_of_migrations += 1 - def disable_unused_nodes(self, model): + def disable_unused_nodes(self): """Generate actions for disablity of unused nodes. - :param model: model_root object :return: None """ - for node in model.get_all_compute_nodes().values(): - if (len(model.get_node_instances(node)) == 0 and + for node in self.compute_model.get_all_compute_nodes().values(): + if (len(self.compute_model.get_node_instances(node)) == 0 and node.status != element.ServiceState.DISABLED.value): self.add_action_disable_node(node) - def get_instance_utilization(self, instance_uuid, model, + def get_instance_utilization(self, instance, period=3600, aggr='avg'): """Collect cpu, ram and disk utilization statistics of a VM. - :param instance_uuid: instance object - :param model: model_root object + :param instance: instance object :param period: seconds :param aggr: string :return: dict(cpu(number of vcpus used), ram(MB used), disk(B used)) """ - if instance_uuid in self.ceilometer_instance_data_cache.keys(): - return self.ceilometer_instance_data_cache.get(instance_uuid) + if instance.uuid in self.ceilometer_instance_data_cache.keys(): + return self.ceilometer_instance_data_cache.get(instance.uuid) cpu_util_metric = 'cpu_util' ram_util_metric = 'memory.usage' @@ -208,58 +205,54 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): ram_alloc_metric = 'memory' disk_alloc_metric = 'disk.root.size' instance_cpu_util = self.ceilometer.statistic_aggregation( - resource_id=instance_uuid, meter_name=cpu_util_metric, + resource_id=instance.uuid, meter_name=cpu_util_metric, period=period, aggregate=aggr) - instance_cpu_cores = model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity( - model.get_instance_by_uuid(instance_uuid)) if instance_cpu_util: total_cpu_utilization = ( - instance_cpu_cores * (instance_cpu_util / 100.0)) + instance.vcpus * (instance_cpu_util / 100.0)) else: - total_cpu_utilization = instance_cpu_cores + total_cpu_utilization = instance.vcpus instance_ram_util = self.ceilometer.statistic_aggregation( - resource_id=instance_uuid, meter_name=ram_util_metric, + resource_id=instance.uuid, meter_name=ram_util_metric, period=period, aggregate=aggr) if not instance_ram_util: instance_ram_util = self.ceilometer.statistic_aggregation( - resource_id=instance_uuid, meter_name=ram_alloc_metric, + resource_id=instance.uuid, meter_name=ram_alloc_metric, period=period, aggregate=aggr) instance_disk_util = self.ceilometer.statistic_aggregation( - resource_id=instance_uuid, meter_name=disk_alloc_metric, + resource_id=instance.uuid, meter_name=disk_alloc_metric, period=period, aggregate=aggr) if not instance_ram_util or not instance_disk_util: LOG.error( _LE('No values returned by %s for memory.usage ' - 'or disk.root.size'), instance_uuid) + 'or disk.root.size'), instance.uuid) raise exception.NoDataFound - self.ceilometer_instance_data_cache[instance_uuid] = dict( + self.ceilometer_instance_data_cache[instance.uuid] = dict( cpu=total_cpu_utilization, ram=instance_ram_util, disk=instance_disk_util) - return self.ceilometer_instance_data_cache.get(instance_uuid) + return self.ceilometer_instance_data_cache.get(instance.uuid) - def get_node_utilization(self, node, model, period=3600, aggr='avg'): + def get_node_utilization(self, node, period=3600, aggr='avg'): """Collect cpu, ram and disk utilization statistics of a node. :param node: node object - :param model: model_root object :param period: seconds :param aggr: string :return: dict(cpu(number of cores used), ram(MB used), disk(B used)) """ - node_instances = model.get_node_instances(node) + node_instances = self.compute_model.get_node_instances(node) node_ram_util = 0 node_disk_util = 0 node_cpu_util = 0 for instance in node_instances: instance_util = self.get_instance_utilization( - instance.uuid, model, period, aggr) + instance, period, aggr) node_cpu_util += instance_util['cpu'] node_ram_util += instance_util['ram'] node_disk_util += instance_util['disk'] @@ -267,53 +260,40 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): return dict(cpu=node_cpu_util, ram=node_ram_util, disk=node_disk_util) - def get_node_capacity(self, node, model): + def get_node_capacity(self, node): """Collect cpu, ram and disk capacity of a node. :param node: node object - :param model: model_root object :return: dict(cpu(cores), ram(MB), disk(B)) """ - node_cpu_capacity = model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity(node) + return dict(cpu=node.vcpus, ram=node.memory, disk=node.disk_capacity) - node_disk_capacity = model.get_resource_by_uuid( - element.ResourceType.disk_capacity).get_capacity(node) - - node_ram_capacity = model.get_resource_by_uuid( - element.ResourceType.memory).get_capacity(node) - return dict(cpu=node_cpu_capacity, ram=node_ram_capacity, - disk=node_disk_capacity) - - def get_relative_node_utilization(self, node, model): - """Return relative node utilization (rhu). + def get_relative_node_utilization(self, node): + """Return relative node utilization. :param node: node object - :param model: model_root object :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>} """ - rhu = {} - util = self.get_node_utilization(node, model) - cap = self.get_node_capacity(node, model) + relative_node_utilization = {} + util = self.get_node_utilization(node) + cap = self.get_node_capacity(node) for k in util.keys(): - rhu[k] = float(util[k]) / float(cap[k]) - return rhu + relative_node_utilization[k] = float(util[k]) / float(cap[k]) + return relative_node_utilization - def get_relative_cluster_utilization(self, model): + def get_relative_cluster_utilization(self): """Calculate relative cluster utilization (rcu). RCU is an average of relative utilizations (rhu) of active nodes. - :param model: model_root object :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>} """ - nodes = model.get_all_compute_nodes().values() + nodes = self.compute_model.get_all_compute_nodes().values() rcu = {} counters = {} for node in nodes: node_state_str = self.get_state_str(node.state) if node_state_str == element.ServiceState.ENABLED.value: - rhu = self.get_relative_node_utilization( - node, model) + rhu = self.get_relative_node_utilization(node) for k in rhu.keys(): if k not in rcu: rcu[k] = 0 @@ -325,39 +305,35 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): rcu[k] /= counters[k] return rcu - def is_overloaded(self, node, model, cc): + def is_overloaded(self, node, cc): """Indicate whether a node is overloaded. This considers provided resource capacity coefficients (cc). :param node: node object - :param model: model_root object :param cc: dictionary containing resource capacity coefficients :return: [True, False] """ - node_capacity = self.get_node_capacity(node, model) + node_capacity = self.get_node_capacity(node) node_utilization = self.get_node_utilization( - node, model) + node) metrics = ['cpu'] for m in metrics: if node_utilization[m] > node_capacity[m] * cc[m]: return True return False - def instance_fits(self, instance_uuid, node, model, cc): + def instance_fits(self, instance, node, cc): """Indicate whether is a node able to accommodate a VM. This considers provided resource capacity coefficients (cc). - :param instance_uuid: string + :param instance: :py:class:`~.element.Instance` :param node: node object - :param model: model_root object :param cc: dictionary containing resource capacity coefficients :return: [True, False] """ - node_capacity = self.get_node_capacity(node, model) - node_utilization = self.get_node_utilization( - node, model) - instance_utilization = self.get_instance_utilization( - instance_uuid, model) + node_capacity = self.get_node_capacity(node) + node_utilization = self.get_node_utilization(node) + instance_utilization = self.get_instance_utilization(instance) metrics = ['cpu', 'ram', 'disk'] for m in metrics: if (instance_utilization[m] + node_utilization[m] > @@ -365,7 +341,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): return False return True - def optimize_solution(self, model): + def optimize_solution(self): """Optimize solution. This is done by eliminating unnecessary or circular set of migrations @@ -378,8 +354,6 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): one migration instead of two. * A->B, B->A => remove A->B and B->A as they do not result in a new VM placement. - - :param model: model_root object """ migrate_actions = ( a for a in self.solution.actions if a[ @@ -398,13 +372,15 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): for a in actions: self.solution.actions.remove(a) self.number_of_migrations -= 1 - src_node = model.get_node_by_uuid(src_uuid) - dst_node = model.get_node_by_uuid(dst_uuid) - instance = model.get_instance_by_uuid(instance_uuid) - if model.migrate_instance(instance, dst_node, src_node): - self.add_migration(instance, src_node, dst_node, model) + src_node = self.compute_model.get_node_by_uuid(src_uuid) + dst_node = self.compute_model.get_node_by_uuid(dst_uuid) + instance = self.compute_model.get_instance_by_uuid( + instance_uuid) + if self.compute_model.migrate_instance( + instance, dst_node, src_node): + self.add_migration(instance, src_node, dst_node) - def offload_phase(self, model, cc): + def offload_phase(self, cc): """Perform offloading phase. This considers provided resource capacity coefficients. @@ -420,29 +396,28 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): the node enabler in this phase doesn't necessarily results in more enabled nodes in the final solution. - :param model: model_root object :param cc: dictionary containing resource capacity coefficients """ sorted_nodes = sorted( - model.get_all_compute_nodes().values(), - key=lambda x: self.get_node_utilization(x, model)['cpu']) + self.compute_model.get_all_compute_nodes().values(), + key=lambda x: self.get_node_utilization(x)['cpu']) for node in reversed(sorted_nodes): - if self.is_overloaded(node, model, cc): + if self.is_overloaded(node, cc): for instance in sorted( - model.get_node_instances(node), + self.compute_model.get_node_instances(node), key=lambda x: self.get_instance_utilization( - x.uuid, model)['cpu'] + x)['cpu'] ): for destination_node in reversed(sorted_nodes): if self.instance_fits( - instance.uuid, destination_node, model, cc): + instance, destination_node, cc): self.add_migration(instance, node, - destination_node, model) + destination_node) break - if not self.is_overloaded(node, model, cc): + if not self.is_overloaded(node, cc): break - def consolidation_phase(self, model, cc): + def consolidation_phase(self, cc): """Perform consolidation phase. This considers provided resource capacity coefficients. @@ -454,27 +429,25 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): in the system than less cpu utilizied VMs which can be later used to fill smaller CPU capacity gaps. - :param model: model_root object :param cc: dictionary containing resource capacity coefficients """ sorted_nodes = sorted( - model.get_all_compute_nodes().values(), - key=lambda x: self.get_node_utilization(x, model)['cpu']) + self.compute_model.get_all_compute_nodes().values(), + key=lambda x: self.get_node_utilization(x)['cpu']) asc = 0 for node in sorted_nodes: instances = sorted( - model.get_node_instances(node), - key=lambda x: self.get_instance_utilization( - x.uuid, model)['cpu']) + self.compute_model.get_node_instances(node), + key=lambda x: self.get_instance_utilization(x)['cpu']) for instance in reversed(instances): dsc = len(sorted_nodes) - 1 for destination_node in reversed(sorted_nodes): if asc >= dsc: break if self.instance_fits( - instance.uuid, destination_node, model, cc): + instance, destination_node, cc): self.add_migration(instance, node, - destination_node, model) + destination_node) break dsc -= 1 asc += 1 @@ -503,25 +476,23 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): :param original_model: root_model object """ LOG.info(_LI('Executing Smart Strategy')) - model = self.compute_model - rcu = self.get_relative_cluster_utilization(model) - self.ceilometer_vm_data_cache = dict() + rcu = self.get_relative_cluster_utilization() cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} # Offloading phase - self.offload_phase(model, cc) + self.offload_phase(cc) # Consolidation phase - self.consolidation_phase(model, cc) + self.consolidation_phase(cc) # Optimize solution - self.optimize_solution(model) + self.optimize_solution() # disable unused nodes - self.disable_unused_nodes(model) + self.disable_unused_nodes() - rcu_after = self.get_relative_cluster_utilization(model) + rcu_after = self.get_relative_cluster_utilization() info = { "compute_nodes_count": len( self.compute_model.get_all_compute_nodes()), diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index d78ecbe89..efdd07849 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -145,17 +145,16 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): }, } - def calculate_used_resource(self, node, cap_cores, cap_mem, - cap_disk): + def calculate_used_resource(self, node): """Calculate the used vcpus, memory and disk based on VM flavors""" instances = self.compute_model.get_node_instances(node) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 for instance in instances: - vcpus_used += cap_cores.get_capacity(instance) - memory_mb_used += cap_mem.get_capacity(instance) - disk_gb_used += cap_disk.get_capacity(instance) + vcpus_used += instance.vcpus + memory_mb_used += instance.memory + disk_gb_used += instance.disk return vcpus_used, memory_mb_used, disk_gb_used @@ -200,18 +199,10 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): def filter_destination_hosts(self, hosts, instance_to_migrate, avg_workload, workload_cache): - '''Only return hosts with sufficient available resources''' - - cap_cores = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) - cap_disk = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk) - cap_mem = self.compute_model.get_resource_by_uuid( - element.ResourceType.memory) - - required_cores = cap_cores.get_capacity(instance_to_migrate) - required_disk = cap_disk.get_capacity(instance_to_migrate) - required_mem = cap_mem.get_capacity(instance_to_migrate) + """Only return hosts with sufficient available resources""" + required_cores = instance_to_migrate.vcpus + required_disk = instance_to_migrate.disk + required_mem = instance_to_migrate.memory # filter nodes without enough resource destination_hosts = [] @@ -221,16 +212,16 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): workload = instance_data['workload'] # calculate the available resources cores_used, mem_used, disk_used = self.calculate_used_resource( - host, cap_cores, cap_mem, cap_disk) - cores_available = cap_cores.get_capacity(host) - cores_used - disk_available = cap_disk.get_capacity(host) - disk_used - mem_available = cap_mem.get_capacity(host) - mem_used + host) + cores_available = host.vcpus - cores_used + disk_available = host.disk - disk_used + mem_available = host.memory - mem_used if ( cores_available >= required_cores and disk_available >= required_disk and mem_available >= required_mem and - (src_instance_workload + workload) < self.threshold / 100 * - cap_cores.get_capacity(host) + ((src_instance_workload + workload) < + self.threshold / 100 * host.vcpus) ): destination_hosts.append(instance_data) @@ -249,9 +240,6 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): cluster_size = len(nodes) if not nodes: raise wexc.ClusterEmpty() - # get cpu cores capacity of nodes and instances - cap_cores = self.compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) overload_hosts = [] nonoverload_hosts = [] # total workload of cluster @@ -259,8 +247,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): # use workload_cache to store the workload of VMs for reuse purpose workload_cache = {} for node_id in nodes: - node = self.compute_model.get_node_by_uuid( - node_id) + node = self.compute_model.get_node_by_uuid(node_id) instances = self.compute_model.get_node_instances(node) node_workload = 0.0 for instance in instances: @@ -277,19 +264,17 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): if cpu_util is None: LOG.debug("Instance (%s): cpu_util is None", instance.uuid) continue - instance_cores = cap_cores.get_capacity(instance) - workload_cache[instance.uuid] = cpu_util * instance_cores / 100 + workload_cache[instance.uuid] = cpu_util * instance.vcpus / 100 node_workload += workload_cache[instance.uuid] LOG.debug("VM (%s): cpu_util %f", instance.uuid, cpu_util) - node_cores = cap_cores.get_capacity(node) - hy_cpu_util = node_workload / node_cores * 100 + node_cpu_util = node_workload / node.vcpus * 100 cluster_workload += node_workload instance_data = { - 'node': node, "cpu_util": hy_cpu_util, + 'node': node, "cpu_util": node_cpu_util, 'workload': node_workload} - if hy_cpu_util >= self.threshold: + if node_cpu_util >= self.threshold: # mark the node to release resources overload_hosts.append(instance_data) else: diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index 318cfd8b1..11ecd5220 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -185,20 +185,17 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): (instance_load['vcpus'] / float(host_vcpus))) @MEMOIZE - def get_instance_load(self, instance_uuid): + def get_instance_load(self, instance): """Gathering instance load through ceilometer statistic. - :param instance_uuid: instance for which statistic is gathered. + :param instance: instance for which statistic is gathered. :return: dict """ LOG.debug('get_instance_load started') - instance_vcpus = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity( - self.compute_model.get_instance_by_uuid(instance_uuid)) - instance_load = {'uuid': instance_uuid, 'vcpus': instance_vcpus} + instance_load = {'uuid': instance.uuid, 'vcpus': instance.vcpus} for meter in self.metrics: avg_meter = self.ceilometer.statistic_aggregation( - resource_id=instance_uuid, + resource_id=instance.uuid, meter_name=meter, period=self.periods['instance'], aggregate='min' @@ -207,8 +204,8 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): LOG.warning( _LW("No values returned by %(resource_id)s " "for %(metric_name)s") % dict( - resource_id=instance_uuid, - metric_name=meter)) + resource_id=instance.uuid, + metric_name=meter)) avg_meter = 0 if meter == 'cpu_util': avg_meter /= float(100) @@ -219,10 +216,8 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): normalized_hosts = copy.deepcopy(hosts) for host in normalized_hosts: if 'memory.resident' in normalized_hosts[host]: - h_memory = self.compute_model.get_resource_by_uuid( - element.ResourceType.memory).get_capacity( - self.compute_model.get_node_by_uuid(host)) - normalized_hosts[host]['memory.resident'] /= float(h_memory) + node = self.compute_model.get_node_by_uuid(host) + normalized_hosts[host]['memory.resident'] /= float(node.memory) return normalized_hosts @@ -237,13 +232,9 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): hosts_load = {} for node_id, node in self.get_available_nodes().items(): hosts_load[node_id] = {} - host_vcpus = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity( - self.compute_model.get_node_by_uuid(node_id)) - hosts_load[node_id]['vcpus'] = host_vcpus + hosts_load[node_id]['vcpus'] = node.vcpus for metric in self.metrics: - resource_id = '' meter_name = self.instance_metrics[metric] if re.match('^compute.node', meter_name) is not None: @@ -294,34 +285,31 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): " for %s in weight dict.") % metric) return weighted_sd - def calculate_migration_case(self, hosts, instance_id, - src_node_id, dst_node_id): + def calculate_migration_case(self, hosts, instance, src_node, dst_node): """Calculate migration case Return list of standard deviation values, that appearing in case of migration of instance from source host to destination host :param hosts: hosts with their workload - :param instance_id: the virtual machine - :param src_node_id: the source node id - :param dst_node_id: the destination node id + :param instance: the virtual machine + :param src_node: the source node + :param dst_node: the destination node :return: list of standard deviation values """ migration_case = [] new_hosts = copy.deepcopy(hosts) - instance_load = self.get_instance_load(instance_id) - d_host_vcpus = new_hosts[dst_node_id]['vcpus'] - s_host_vcpus = new_hosts[src_node_id]['vcpus'] + instance_load = self.get_instance_load(instance) + s_host_vcpus = new_hosts[src_node.uuid]['vcpus'] + d_host_vcpus = new_hosts[dst_node.uuid]['vcpus'] for metric in self.metrics: if metric is 'cpu_util': - new_hosts[src_node_id][metric] -= self.transform_instance_cpu( - instance_load, - s_host_vcpus) - new_hosts[dst_node_id][metric] += self.transform_instance_cpu( - instance_load, - d_host_vcpus) + new_hosts[src_node.uuid][metric] -= ( + self.transform_instance_cpu(instance_load, s_host_vcpus)) + new_hosts[dst_node.uuid][metric] += ( + self.transform_instance_cpu(instance_load, d_host_vcpus)) else: - new_hosts[src_node_id][metric] -= instance_load[metric] - new_hosts[dst_node_id][metric] += instance_load[metric] + new_hosts[src_node.uuid][metric] -= instance_load[metric] + new_hosts[dst_node.uuid][metric] += instance_load[metric] normalized_hosts = self.normalize_hosts_load(new_hosts) for metric in self.metrics: migration_case.append(self.get_sd(normalized_hosts, metric)) @@ -343,26 +331,27 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): instance_host_map = [] nodes = list(self.get_available_nodes()) - for src_node_id in nodes: - src_node = self.compute_model.get_node_by_uuid(src_node_id) + for src_host in nodes: + src_node = self.compute_model.get_node_by_uuid(src_host) c_nodes = copy.copy(nodes) - c_nodes.remove(src_node_id) + c_nodes.remove(src_host) node_list = yield_nodes(c_nodes) for instance in self.compute_model.get_node_instances(src_node): min_sd_case = {'value': len(self.metrics)} if instance.state not in [element.InstanceState.ACTIVE.value, element.InstanceState.PAUSED.value]: continue - for dst_node_id in next(node_list): + for dst_host in next(node_list): + dst_node = self.compute_model.get_node_by_uuid(dst_host) sd_case = self.calculate_migration_case( - hosts, instance.uuid, src_node_id, dst_node_id) + hosts, instance, src_node, dst_node) weighted_sd = self.calculate_weighted_sd(sd_case[:-1]) if weighted_sd < min_sd_case['value']: min_sd_case = { - 'host': dst_node_id, 'value': weighted_sd, - 's_host': src_node_id, 'instance': instance.uuid} + 'host': dst_node.uuid, 'value': weighted_sd, + 's_host': src_node.uuid, 'instance': instance.uuid} instance_host_map.append(min_sd_case) return sorted(instance_host_map, key=lambda x: x['value']) @@ -433,19 +422,16 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): min_sd = 1 balanced = False for instance_host in migration: - dst_hp_disk = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk).get_capacity( - self.compute_model.get_node_by_uuid( - instance_host['host'])) - instance_disk = self.compute_model.get_resource_by_uuid( - element.ResourceType.disk).get_capacity( - self.compute_model.get_instance_by_uuid( - instance_host['instance'])) - if instance_disk > dst_hp_disk: + instance = self.compute_model.get_instance_by_uuid( + instance_host['instance']) + src_node = self.compute_model.get_node_by_uuid( + instance_host['s_host']) + dst_node = self.compute_model.get_node_by_uuid( + instance_host['host']) + if instance.disk > dst_node.disk: continue instance_load = self.calculate_migration_case( - hosts_load, instance_host['instance'], - instance_host['s_host'], instance_host['host']) + hosts_load, instance, src_node, dst_node) weighted_sd = self.calculate_weighted_sd(instance_load[:-1]) if weighted_sd < min_sd: min_sd = weighted_sd diff --git a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py index 93cfd1272..e0664158a 100644 --- a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py +++ b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py @@ -22,7 +22,6 @@ import os import mock from watcher.decision_engine.model.collector import base -from watcher.decision_engine.model import element from watcher.decision_engine.model import model_root as modelroot @@ -106,16 +105,12 @@ class FakeCeilometerMetrics(object): node = self.model.get_node_by_uuid(node_uuid) instances = self.model.get_node_instances(node) util_sum = 0.0 - node_cpu_cores = self.model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity_by_uuid(node.uuid) for instance_uuid in instances: - instance_cpu_cores = self.model.get_resource_by_uuid( - element.ResourceType.cpu_cores).\ - get_capacity(self.model.get_instance_by_uuid(instance_uuid)) - total_cpu_util = instance_cpu_cores * self.get_instance_cpu_util( - instance_uuid) + instance = self.model.get_instance_by_uuid(instance_uuid) + total_cpu_util = instance.vcpus * self.get_instance_cpu_util( + instance.uuid) util_sum += total_cpu_util / 100.0 - util_sum /= node_cpu_cores + util_sum /= node.vcpus return util_sum * 100.0 @staticmethod diff --git a/watcher/tests/decision_engine/model/notification/test_nova_notifications.py b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py index b24edea2d..8b07e4143 100644 --- a/watcher/tests/decision_engine/model/notification/test_nova_notifications.py +++ b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py @@ -217,26 +217,18 @@ class TestNovaNotifications(NotificationTestCase): ) instance0 = compute_model.get_instance_by_uuid(instance0_uuid) - cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk = compute_model.get_resource_by_uuid( - element.ResourceType.disk) - disk_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.disk_capacity) - memory_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.memory) self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) - self.assertEqual(1, cpu_capacity.get_capacity(instance0)) - self.assertEqual(1, disk_capacity.get_capacity(instance0)) - self.assertEqual(512, memory_capacity.get_capacity(instance0)) + self.assertEqual(1, instance0.vcpus) + self.assertEqual(1, instance0.disk_capacity) + self.assertEqual(512, instance0.memory) m_get_compute_node_by_hostname.assert_called_once_with('Node_2') node_2 = compute_model.get_node_by_uuid('Node_2') - self.assertEqual(7777, memory_capacity.get_capacity(node_2)) - self.assertEqual(42, cpu_capacity.get_capacity(node_2)) - self.assertEqual(974, disk.get_capacity(node_2)) - self.assertEqual(1337, disk_capacity.get_capacity(node_2)) + self.assertEqual(7777, node_2.memory) + self.assertEqual(42, node_2.vcpus) + self.assertEqual(974, node_2.disk) + self.assertEqual(1337, node_2.disk_capacity) @mock.patch.object(nova_helper, "NovaHelper") def test_instance_update_node_notfound_set_unmapped( @@ -265,20 +257,12 @@ class TestNovaNotifications(NotificationTestCase): ) instance0 = compute_model.get_instance_by_uuid(instance0_uuid) - cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk = compute_model.get_resource_by_uuid( - element.ResourceType.disk) - disk_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.disk_capacity) - memory_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.memory) self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) - self.assertEqual(1, cpu_capacity.get_capacity(instance0)) - self.assertEqual(1, disk.get_capacity(instance0)) - self.assertEqual(1, disk_capacity.get_capacity(instance0)) - self.assertEqual(512, memory_capacity.get_capacity(instance0)) + self.assertEqual(1, instance0.vcpus) + self.assertEqual(1, instance0.disk) + self.assertEqual(1, instance0.disk_capacity) + self.assertEqual(512, instance0.memory) m_get_compute_node_by_hostname.assert_any_call('Node_2') self.assertRaises( @@ -306,17 +290,11 @@ class TestNovaNotifications(NotificationTestCase): ) instance0 = compute_model.get_instance_by_uuid(instance0_uuid) - cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.disk) - memory_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.memory) self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state) - self.assertEqual(1, cpu_capacity.get_capacity(instance0)) - self.assertEqual(1, disk_capacity.get_capacity(instance0)) - self.assertEqual(512, memory_capacity.get_capacity(instance0)) + self.assertEqual(1, instance0.vcpus) + self.assertEqual(1, instance0.disk_capacity) + self.assertEqual(512, instance0.memory) def test_nova_instance_delete_end(self): compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() @@ -374,17 +352,11 @@ class TestLegacyNovaNotifications(NotificationTestCase): ) instance0 = compute_model.get_instance_by_uuid(instance0_uuid) - cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.disk) - memory_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.memory) self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state) - self.assertEqual(1, cpu_capacity.get_capacity(instance0)) - self.assertEqual(1, disk_capacity.get_capacity(instance0)) - self.assertEqual(512, memory_capacity.get_capacity(instance0)) + self.assertEqual(1, instance0.vcpus) + self.assertEqual(1, instance0.disk_capacity) + self.assertEqual(512, instance0.memory) def test_legacy_instance_updated(self): compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() @@ -445,27 +417,19 @@ class TestLegacyNovaNotifications(NotificationTestCase): ) instance0 = compute_model.get_instance_by_uuid(instance0_uuid) - cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk = compute_model.get_resource_by_uuid( - element.ResourceType.disk) - disk_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.disk_capacity) - memory_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.memory) self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) - self.assertEqual(1, cpu_capacity.get_capacity(instance0)) - self.assertEqual(1, disk.get_capacity(instance0)) - self.assertEqual(1, disk_capacity.get_capacity(instance0)) - self.assertEqual(512, memory_capacity.get_capacity(instance0)) + self.assertEqual(1, instance0.vcpus) + self.assertEqual(1, instance0.disk) + self.assertEqual(1, instance0.disk_capacity) + self.assertEqual(512, instance0.memory) m_get_compute_node_by_hostname.assert_any_call('Node_2') node_2 = compute_model.get_node_by_uuid('Node_2') - self.assertEqual(7777, memory_capacity.get_capacity(node_2)) - self.assertEqual(42, cpu_capacity.get_capacity(node_2)) - self.assertEqual(974, disk.get_capacity(node_2)) - self.assertEqual(1337, disk_capacity.get_capacity(node_2)) + self.assertEqual(7777, node_2.memory) + self.assertEqual(42, node_2.vcpus) + self.assertEqual(974, node_2.disk) + self.assertEqual(1337, node_2.disk_capacity) @mock.patch.object(nova_helper, "NovaHelper") def test_legacy_instance_update_node_notfound_set_unmapped( @@ -494,20 +458,12 @@ class TestLegacyNovaNotifications(NotificationTestCase): ) instance0 = compute_model.get_instance_by_uuid(instance0_uuid) - cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.vcpus) - disk = compute_model.get_resource_by_uuid( - element.ResourceType.disk) - disk_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.disk_capacity) - memory_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.memory) self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) - self.assertEqual(1, cpu_capacity.get_capacity(instance0)) - self.assertEqual(1, disk.get_capacity(instance0)) - self.assertEqual(1, disk_capacity.get_capacity(instance0)) - self.assertEqual(512, memory_capacity.get_capacity(instance0)) + self.assertEqual(1, instance0.vcpus) + self.assertEqual(1, instance0.disk) + self.assertEqual(1, instance0.disk_capacity) + self.assertEqual(512, instance0.memory) m_get_compute_node_by_hostname.assert_any_call('Node_2') self.assertRaises( diff --git a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py index 9859b668a..a3e31adf4 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py @@ -22,7 +22,6 @@ import mock from watcher.applier.loading import default from watcher.common import exception from watcher.common import utils -from watcher.decision_engine.model import element from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base @@ -69,15 +68,12 @@ class TestOutletTempControl(base.TestCase): self.strategy.input_parameters.update({'threshold': 34.3}) self.strategy.threshold = 34.3 - def test_calc_used_res(self): + def test_calc_used_resource(self): model = self.fake_cluster.generate_scenario_3_with_2_nodes() self.m_model.return_value = model node = model.get_node_by_uuid('Node_0') - cap_cores = model.get_resource_by_uuid(element.ResourceType.cpu_cores) - cap_mem = model.get_resource_by_uuid(element.ResourceType.memory) - cap_disk = model.get_resource_by_uuid(element.ResourceType.disk) - cores_used, mem_used, disk_used = self.strategy.calc_used_res( - node, cap_cores, cap_mem, cap_disk) + cores_used, mem_used, disk_used = self.strategy.calc_used_resource( + node) self.assertEqual((10, 2, 20), (cores_used, mem_used, disk_used)) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py index 66651cad9..c5a575e35 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py @@ -22,7 +22,6 @@ import mock from watcher.applier.loading import default from watcher.common import exception from watcher.common import utils -from watcher.decision_engine.model import element from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base @@ -74,16 +73,12 @@ class TestUniformAirflow(base.TestCase): self.strategy.threshold_power = 350 self._period = 300 - def test_calc_used_res(self): + def test_calc_used_resource(self): model = self.fake_cluster.generate_scenario_7_with_2_nodes() self.m_model.return_value = model node = model.get_node_by_uuid('Node_0') - cap_cores = model.get_resource_by_uuid(element.ResourceType.cpu_cores) - cap_mem = model.get_resource_by_uuid(element.ResourceType.memory) - cap_disk = model.get_resource_by_uuid(element.ResourceType.disk) - cores_used, mem_used, disk_used = self.\ - strategy.calculate_used_resource( - node, cap_cores, cap_mem, cap_disk) + cores_used, mem_used, disk_used = ( + self.strategy.calculate_used_resource(node)) self.assertEqual((cores_used, mem_used, disk_used), (25, 4, 40)) def test_group_hosts_by_airflow(self): diff --git a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py index 96ec0f9e2..95c79ca3b 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py @@ -81,7 +81,7 @@ class TestVMWorkloadConsolidation(base.TestCase): instance_util = dict(cpu=1.0, ram=1, disk=10) self.assertEqual( instance_util, - self.strategy.get_instance_utilization(instance_0.uuid, model)) + self.strategy.get_instance_utilization(instance_0, model)) def test_get_node_utilization(self): model = self.fake_cluster.generate_scenario_1() @@ -99,16 +99,14 @@ class TestVMWorkloadConsolidation(base.TestCase): self.fake_metrics.model = model node_0 = model.get_node_by_uuid("Node_0") node_util = dict(cpu=40, ram=64, disk=250) - self.assertEqual(node_util, - self.strategy.get_node_capacity(node_0, model)) + self.assertEqual(node_util, self.strategy.get_node_capacity(node_0)) def test_get_relative_node_utilization(self): model = self.fake_cluster.generate_scenario_1() self.m_model.return_value = model self.fake_metrics.model = model node = model.get_node_by_uuid('Node_0') - rhu = self.strategy.get_relative_node_utilization( - node, model) + rhu = self.strategy.get_relative_node_utilization(node) expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025} self.assertEqual(expected_rhu, rhu) @@ -116,7 +114,7 @@ class TestVMWorkloadConsolidation(base.TestCase): model = self.fake_cluster.generate_scenario_1() self.m_model.return_value = model self.fake_metrics.model = model - cru = self.strategy.get_relative_cluster_utilization(model) + cru = self.strategy.get_relative_cluster_utilization() expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375} self.assertEqual(expected_cru, cru) @@ -128,7 +126,7 @@ class TestVMWorkloadConsolidation(base.TestCase): n2 = model.get_node_by_uuid('Node_1') instance_uuid = 'INSTANCE_0' instance = model.get_instance_by_uuid(instance_uuid) - self.strategy.add_migration(instance, n1, n2, model) + self.strategy.add_migration(instance, n1, n2) self.assertEqual(1, len(self.strategy.solution.actions)) expected = {'action_type': 'migrate', 'input_parameters': {'destination_node': n2.uuid, @@ -143,15 +141,15 @@ class TestVMWorkloadConsolidation(base.TestCase): self.fake_metrics.model = model n1 = model.get_node_by_uuid('Node_0') cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - res = self.strategy.is_overloaded(n1, model, cc) + res = self.strategy.is_overloaded(n1, cc) self.assertFalse(res) cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} - res = self.strategy.is_overloaded(n1, model, cc) + res = self.strategy.is_overloaded(n1, cc) self.assertFalse(res) cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0} - res = self.strategy.is_overloaded(n1, model, cc) + res = self.strategy.is_overloaded(n1, cc) self.assertTrue(res) def test_instance_fits(self): @@ -159,13 +157,13 @@ class TestVMWorkloadConsolidation(base.TestCase): self.m_model.return_value = model self.fake_metrics.model = model n = model.get_node_by_uuid('Node_1') - instance_uuid = 'INSTANCE_0' + instance0 = model.get_instance_by_uuid('INSTANCE_0') cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - res = self.strategy.instance_fits(instance_uuid, n, model, cc) + res = self.strategy.instance_fits(instance0, n, cc) self.assertTrue(res) cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} - res = self.strategy.instance_fits(instance_uuid, n, model, cc) + res = self.strategy.instance_fits(instance0, n, cc) self.assertFalse(res) def test_add_action_enable_compute_node(self): @@ -198,13 +196,13 @@ class TestVMWorkloadConsolidation(base.TestCase): n2 = model.get_node_by_uuid('Node_1') instance_uuid = 'INSTANCE_0' instance = model.get_instance_by_uuid(instance_uuid) - self.strategy.disable_unused_nodes(model) + self.strategy.disable_unused_nodes() self.assertEqual(0, len(self.strategy.solution.actions)) # Migrate VM to free the node - self.strategy.add_migration(instance, n1, n2, model) + self.strategy.add_migration(instance, n1, n2) - self.strategy.disable_unused_nodes(model) + self.strategy.disable_unused_nodes() expected = {'action_type': 'change_nova_service_state', 'input_parameters': {'state': 'disabled', 'resource_id': 'Node_0'}} @@ -216,7 +214,7 @@ class TestVMWorkloadConsolidation(base.TestCase): self.m_model.return_value = model self.fake_metrics.model = model cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - self.strategy.offload_phase(model, cc) + self.strategy.offload_phase(cc) expected = [] self.assertEqual(expected, self.strategy.solution.actions) @@ -228,7 +226,7 @@ class TestVMWorkloadConsolidation(base.TestCase): n2 = model.get_node_by_uuid('Node_1') instance_uuid = 'INSTANCE_0' cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - self.strategy.consolidation_phase(model, cc) + self.strategy.consolidation_phase(cc) expected = [{'action_type': 'migrate', 'input_parameters': {'destination_node': n2.uuid, 'source_node': n1.uuid, @@ -242,9 +240,9 @@ class TestVMWorkloadConsolidation(base.TestCase): self.fake_metrics.model = model n1 = model.get_node_by_uuid('Node_0') cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - self.strategy.offload_phase(model, cc) - self.strategy.consolidation_phase(model, cc) - self.strategy.optimize_solution(model) + self.strategy.offload_phase(cc) + self.strategy.consolidation_phase(cc) + self.strategy.optimize_solution() n2 = self.strategy.solution.actions[0][ 'input_parameters']['destination_node'] expected = [{'action_type': 'migrate', @@ -267,7 +265,7 @@ class TestVMWorkloadConsolidation(base.TestCase): n1 = model.get_node_by_uuid('Node_0') n2 = model.get_node_by_uuid('Node_1') cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - self.strategy.offload_phase(model, cc) + self.strategy.offload_phase(cc) expected = [{'action_type': 'migrate', 'input_parameters': {'destination_node': n2.uuid, 'migration_type': 'live', @@ -284,14 +282,14 @@ class TestVMWorkloadConsolidation(base.TestCase): 'resource_id': 'INSTANCE_8', 'source_node': n1.uuid}}] self.assertEqual(expected, self.strategy.solution.actions) - self.strategy.consolidation_phase(model, cc) + self.strategy.consolidation_phase(cc) expected.append({'action_type': 'migrate', 'input_parameters': {'destination_node': n1.uuid, 'migration_type': 'live', 'resource_id': 'INSTANCE_7', 'source_node': n2.uuid}}) self.assertEqual(expected, self.strategy.solution.actions) - self.strategy.optimize_solution(model) + self.strategy.optimize_solution() del expected[3] del expected[1] self.assertEqual(expected, self.strategy.solution.actions) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py index 529ece291..7c8d2aab9 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -22,7 +22,6 @@ import mock from watcher.applier.loading import default from watcher.common import exception from watcher.common import utils -from watcher.decision_engine.model import element from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base @@ -70,16 +69,12 @@ class TestWorkloadBalance(base.TestCase): self.strategy.threshold = 25.0 self.strategy._period = 300 - def test_calc_used_res(self): + def test_calc_used_resource(self): model = self.fake_cluster.generate_scenario_6_with_2_nodes() self.m_model.return_value = model node = model.get_node_by_uuid('Node_0') - cap_cores = model.get_resource_by_uuid(element.ResourceType.vcpus) - cap_mem = model.get_resource_by_uuid(element.ResourceType.memory) - cap_disk = model.get_resource_by_uuid(element.ResourceType.disk) cores_used, mem_used, disk_used = ( - self.strategy.calculate_used_resource( - node, cap_cores, cap_mem, cap_disk)) + self.strategy.calculate_used_resource(node)) self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40)) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py index 17e5b49e5..736744fec 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py @@ -93,25 +93,29 @@ class TestWorkloadStabilization(base.TestCase): self.strategy.periods = {"instance": 720, "node": 600} def test_get_instance_load(self): - self.m_model.return_value = self.fake_cluster.generate_scenario_1() + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + instance0 = model.get_instance_by_uuid("INSTANCE_0") instance_0_dict = { 'uuid': 'INSTANCE_0', 'vcpus': 10, 'cpu_util': 0.07, 'memory.resident': 2} self.assertEqual( - instance_0_dict, self.strategy.get_instance_load("INSTANCE_0")) + instance_0_dict, self.strategy.get_instance_load(instance0)) def test_periods(self): - self.m_model.return_value = self.fake_cluster.generate_scenario_1() + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model p_ceilometer = mock.patch.object( strategies.WorkloadStabilization, "ceilometer") m_ceilometer = p_ceilometer.start() self.addCleanup(p_ceilometer.stop) m_ceilometer.return_value = mock.Mock( statistic_aggregation=self.fake_metrics.mock_get_statistics) - self.strategy.get_instance_load("INSTANCE_0") + instance0 = model.get_instance_by_uuid("INSTANCE_0") + self.strategy.get_instance_load(instance0) m_ceilometer.statistic_aggregation.assert_called_with( aggregate='min', meter_name='memory.resident', - period=720, resource_id='INSTANCE_0') + period=720, resource_id=instance0.uuid) self.strategy.get_hosts_load() m_ceilometer.statistic_aggregation.assert_called_with( aggregate='avg', meter_name='hardware.memory.used', @@ -158,10 +162,14 @@ class TestWorkloadStabilization(base.TestCase): self.assertEqual(self.strategy.calculate_weighted_sd(sd_case), 1.25) def test_calculate_migration_case(self): - self.m_model.return_value = self.fake_cluster.generate_scenario_1() + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + instance = model.get_instance_by_uuid("INSTANCE_5") + src_node = model.get_node_by_uuid("Node_2") + dst_node = model.get_node_by_uuid("Node_1") result = self.strategy.calculate_migration_case( - self.hosts_load_assert, "INSTANCE_5", "Node_2", "Node_1")[-1][ - "Node_1"] + self.hosts_load_assert, instance, + src_node, dst_node)[-1][dst_node.uuid] result['cpu_util'] = round(result['cpu_util'], 3) self.assertEqual(result, {'cpu_util': 0.095, 'memory.resident': 21.0, 'vcpus': 40}) diff --git a/watcher_tempest_plugin/tests/scenario/test_execute_basic_optim.py b/watcher_tempest_plugin/tests/scenario/test_execute_basic_optim.py index c20123bc5..8568b7fa6 100644 --- a/watcher_tempest_plugin/tests/scenario/test_execute_basic_optim.py +++ b/watcher_tempest_plugin/tests/scenario/test_execute_basic_optim.py @@ -30,7 +30,7 @@ CONF = config.CONF class TestExecuteBasicStrategy(base.BaseInfraOptimScenarioTest): """Tests for action plans""" - BASIC_GOAL = "server_consolidation" + GOAL_NAME = "server_consolidation" @classmethod def skip_checks(cls): @@ -117,12 +117,15 @@ class TestExecuteBasicStrategy(base.BaseInfraOptimScenarioTest): all_hosts = host_client.list_hosts()['hosts'] compute_nodes = [x for x in all_hosts if x['service'] == 'compute'] - for _ in compute_nodes[:CONF.compute.min_compute_nodes]: + for idx, _ in enumerate( + compute_nodes[:CONF.compute.min_compute_nodes], start=1): # by getting to active state here, this means this has # landed on the host in question. - self.create_server(image_id=CONF.compute.image_ref, - wait_until='ACTIVE', - clients=self.mgr) + self.create_server( + name="instance-%d" % idx, + image_id=CONF.compute.image_ref, + wait_until='ACTIVE', + clients=self.mgr) def test_execute_basic_action_plan(self): """Execute an action plan based on the BASIC strategy @@ -136,7 +139,7 @@ class TestExecuteBasicStrategy(base.BaseInfraOptimScenarioTest): self.addCleanup(self.rollback_compute_nodes_status) self._create_one_instance_per_host() - _, goal = self.client.show_goal(self.BASIC_GOAL) + _, goal = self.client.show_goal(self.GOAL_NAME) _, strategy = self.client.show_strategy("basic") _, audit_template = self.create_audit_template( goal['uuid'], strategy=strategy['uuid'])