Merge "Add Overload standard deviation strategy"
This commit is contained in:
@@ -7,6 +7,7 @@ jsonpatch>=1.1 # BSD
|
|||||||
keystoneauth1>=2.1.0 # Apache-2.0
|
keystoneauth1>=2.1.0 # Apache-2.0
|
||||||
keystonemiddleware!=4.1.0,!=4.5.0,>=4.0.0 # Apache-2.0
|
keystonemiddleware!=4.1.0,!=4.5.0,>=4.0.0 # Apache-2.0
|
||||||
oslo.concurrency>=3.8.0 # Apache-2.0
|
oslo.concurrency>=3.8.0 # Apache-2.0
|
||||||
|
oslo.cache>=1.5.0 # Apache-2.0
|
||||||
oslo.config>=3.9.0 # Apache-2.0
|
oslo.config>=3.9.0 # Apache-2.0
|
||||||
oslo.context>=2.2.0 # Apache-2.0
|
oslo.context>=2.2.0 # Apache-2.0
|
||||||
oslo.db>=4.1.0 # Apache-2.0
|
oslo.db>=4.1.0 # Apache-2.0
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ watcher_strategies =
|
|||||||
basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation
|
basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation
|
||||||
outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
|
outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
|
||||||
vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation
|
vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation
|
||||||
|
workload_stabilization = watcher.decision_engine.strategy.strategies.workload_stabilization:WorkloadStabilization
|
||||||
|
|
||||||
watcher_actions =
|
watcher_actions =
|
||||||
migrate = watcher.applier.actions.migration:Migrate
|
migrate = watcher.applier.actions.migration:Migrate
|
||||||
|
|||||||
@@ -298,6 +298,14 @@ class NoAvailableStrategyForGoal(WatcherException):
|
|||||||
msg_fmt = _("No strategy could be found to achieve the '%(goal)s' goal.")
|
msg_fmt = _("No strategy could be found to achieve the '%(goal)s' goal.")
|
||||||
|
|
||||||
|
|
||||||
|
class NoMetricValuesForVM(WatcherException):
|
||||||
|
msg_fmt = _("No values returned by %(resource_id)s for %(metric_name)s.")
|
||||||
|
|
||||||
|
|
||||||
|
class NoSuchMetricForHost(WatcherException):
|
||||||
|
msg_fmt = _("No %(metric)s metric for %(host)s found.")
|
||||||
|
|
||||||
|
|
||||||
# Model
|
# Model
|
||||||
|
|
||||||
class InstanceNotFound(WatcherException):
|
class InstanceNotFound(WatcherException):
|
||||||
|
|||||||
@@ -20,11 +20,14 @@ from watcher.decision_engine.strategy.strategies import dummy_strategy
|
|||||||
from watcher.decision_engine.strategy.strategies import outlet_temp_control
|
from watcher.decision_engine.strategy.strategies import outlet_temp_control
|
||||||
from watcher.decision_engine.strategy.strategies import \
|
from watcher.decision_engine.strategy.strategies import \
|
||||||
vm_workload_consolidation
|
vm_workload_consolidation
|
||||||
|
from watcher.decision_engine.strategy.strategies import workload_stabilization
|
||||||
|
|
||||||
BasicConsolidation = basic_consolidation.BasicConsolidation
|
BasicConsolidation = basic_consolidation.BasicConsolidation
|
||||||
OutletTempControl = outlet_temp_control.OutletTempControl
|
OutletTempControl = outlet_temp_control.OutletTempControl
|
||||||
DummyStrategy = dummy_strategy.DummyStrategy
|
DummyStrategy = dummy_strategy.DummyStrategy
|
||||||
VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation
|
VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation
|
||||||
|
WorkloadStabilization = workload_stabilization.WorkloadStabilization
|
||||||
|
|
||||||
__all__ = ("BasicConsolidation", "OutletTempControl",
|
__all__ = ("BasicConsolidation", "OutletTempControl",
|
||||||
"DummyStrategy", "VMWorkloadConsolidation")
|
"DummyStrategy", "VMWorkloadConsolidation",
|
||||||
|
"WorkloadStabilization")
|
||||||
|
|||||||
@@ -222,3 +222,19 @@ class ThermalOptimizationBaseStrategy(BaseStrategy):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def get_translatable_goal_display_name(cls):
|
def get_translatable_goal_display_name(cls):
|
||||||
return "Thermal optimization"
|
return "Thermal optimization"
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class WorkloadStabilizationBaseStrategy(BaseStrategy):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_goal_name(cls):
|
||||||
|
return "WORKLOAD_BALANCING"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_goal_display_name(cls):
|
||||||
|
return _("Workload balancing")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_translatable_goal_display_name(cls):
|
||||||
|
return "Workload balancing"
|
||||||
|
|||||||
@@ -0,0 +1,414 @@
|
|||||||
|
# -*- encoding: utf-8 -*-
|
||||||
|
# Copyright (c) 2016 Servionica LLC
|
||||||
|
#
|
||||||
|
# Authors: Alexander Chadin <a.chadin@servionica.ru>
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
import itertools
|
||||||
|
import math
|
||||||
|
import random
|
||||||
|
|
||||||
|
import oslo_cache
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log
|
||||||
|
|
||||||
|
from watcher._i18n import _LI, _
|
||||||
|
from watcher.common import exception
|
||||||
|
from watcher.decision_engine.model import resource
|
||||||
|
from watcher.decision_engine.model import vm_state
|
||||||
|
from watcher.decision_engine.strategy.strategies import base
|
||||||
|
from watcher.metrics_engine.cluster_history import ceilometer as \
|
||||||
|
ceilometer_cluster_history
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = ['cpu_util', 'memory.resident']
|
||||||
|
thresholds_dict = {'cpu_util': 0.2, 'memory.resident': 0.2}
|
||||||
|
weights_dict = {'cpu_util_weight': 1.0, 'memory.resident_weight': 1.0}
|
||||||
|
vm_host_measures = {'cpu_util': 'hardware.cpu.util',
|
||||||
|
'memory.resident': 'hardware.memory.used'}
|
||||||
|
|
||||||
|
ws_opts = [
|
||||||
|
cfg.ListOpt('metrics',
|
||||||
|
default=metrics,
|
||||||
|
required=True,
|
||||||
|
help='Metrics used as rates of cluster loads.'),
|
||||||
|
cfg.DictOpt('thresholds',
|
||||||
|
default=thresholds_dict,
|
||||||
|
help=''),
|
||||||
|
cfg.DictOpt('weights',
|
||||||
|
default=weights_dict,
|
||||||
|
help='These weights used to calculate '
|
||||||
|
'common standard deviation. Name of weight '
|
||||||
|
'contains meter name and _weight suffix.'),
|
||||||
|
cfg.StrOpt('host_choice',
|
||||||
|
default='retry',
|
||||||
|
required=True,
|
||||||
|
help="Method of host's choice."),
|
||||||
|
cfg.IntOpt('retry_count',
|
||||||
|
default=1,
|
||||||
|
required=True,
|
||||||
|
help='Count of random returned hosts.'),
|
||||||
|
]
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
CONF.register_opts(ws_opts, 'watcher_strategies.workload_stabilization')
|
||||||
|
|
||||||
|
|
||||||
|
def _set_memoize(conf):
|
||||||
|
oslo_cache.configure(conf)
|
||||||
|
region = oslo_cache.create_region()
|
||||||
|
configured_region = oslo_cache.configure_cache_region(conf, region)
|
||||||
|
return oslo_cache.core.get_memoization_decorator(conf,
|
||||||
|
configured_region,
|
||||||
|
'cache')
|
||||||
|
|
||||||
|
|
||||||
|
class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||||
|
"""Workload Stabilization control using live migration
|
||||||
|
|
||||||
|
*Description*
|
||||||
|
|
||||||
|
This is workload stabilization strategy based on standard deviation
|
||||||
|
algorithm. The goal is to determine if there is an overload in a cluster
|
||||||
|
and respond to it by migrating VMs to stabilize the cluster.
|
||||||
|
|
||||||
|
*Requirements*
|
||||||
|
|
||||||
|
* Software: Ceilometer component ceilometer-compute running
|
||||||
|
in each compute host, and Ceilometer API can report such telemetries
|
||||||
|
``memory.resident`` and ``cpu_util`` successfully.
|
||||||
|
* You must have at least 2 physical compute nodes to run this strategy.
|
||||||
|
|
||||||
|
*Limitations*
|
||||||
|
|
||||||
|
- It assume that live migrations are possible
|
||||||
|
- Load on the system is sufficiently stable.
|
||||||
|
|
||||||
|
*Spec URL*
|
||||||
|
|
||||||
|
https://review.openstack.org/#/c/286153/
|
||||||
|
"""
|
||||||
|
|
||||||
|
MIGRATION = "migrate"
|
||||||
|
MEMOIZE = _set_memoize(CONF)
|
||||||
|
|
||||||
|
def __init__(self, osc=None):
|
||||||
|
super(WorkloadStabilization, self).__init__(osc)
|
||||||
|
self._ceilometer = None
|
||||||
|
self._nova = None
|
||||||
|
self.weights = CONF['watcher_strategies.workload_stabilization']\
|
||||||
|
.weights
|
||||||
|
self.metrics = CONF['watcher_strategies.workload_stabilization']\
|
||||||
|
.metrics
|
||||||
|
self.thresholds = CONF['watcher_strategies.workload_stabilization']\
|
||||||
|
.thresholds
|
||||||
|
self.host_choice = CONF['watcher_strategies.workload_stabilization']\
|
||||||
|
.host_choice
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_name(cls):
|
||||||
|
return "WORKLOAD_BALANCING"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_display_name(cls):
|
||||||
|
return _("Workload balancing")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_translatable_display_name(cls):
|
||||||
|
return "Workload balancing"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ceilometer(self):
|
||||||
|
if self._ceilometer is None:
|
||||||
|
self._ceilometer = (ceilometer_cluster_history.
|
||||||
|
CeilometerClusterHistory(osc=self.osc))
|
||||||
|
return self._ceilometer
|
||||||
|
|
||||||
|
@property
|
||||||
|
def nova(self):
|
||||||
|
if self._nova is None:
|
||||||
|
self._nova = self.osc.nova()
|
||||||
|
return self._nova
|
||||||
|
|
||||||
|
@nova.setter
|
||||||
|
def nova(self, n):
|
||||||
|
self._nova = n
|
||||||
|
|
||||||
|
@ceilometer.setter
|
||||||
|
def ceilometer(self, c):
|
||||||
|
self._ceilometer = c
|
||||||
|
|
||||||
|
def transform_vm_cpu(self, vm_load, host_vcpus):
|
||||||
|
"""This method transforms vm cpu utilization to overall host cpu utilization.
|
||||||
|
|
||||||
|
:param vm_load: dict that contains vm uuid and utilization info.
|
||||||
|
:param host_vcpus: int
|
||||||
|
:return: float value
|
||||||
|
"""
|
||||||
|
return vm_load['cpu_util'] * (vm_load['vcpus'] / float(host_vcpus))
|
||||||
|
|
||||||
|
@MEMOIZE
|
||||||
|
def get_vm_load(self, vm_uuid, current_model):
|
||||||
|
"""Gathering vm load through ceilometer statistic.
|
||||||
|
|
||||||
|
:param vm_uuid: vm for which statistic is gathered.
|
||||||
|
:param current_model: the cluster model
|
||||||
|
:return: dict
|
||||||
|
"""
|
||||||
|
LOG.debug(_LI('get_vm_load started'))
|
||||||
|
vm_vcpus = current_model.get_resource_from_id(
|
||||||
|
resource.ResourceType.cpu_cores).get_capacity(
|
||||||
|
current_model.get_vm_from_id(vm_uuid))
|
||||||
|
vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus}
|
||||||
|
for meter in self.metrics:
|
||||||
|
avg_meter = self.ceilometer.statistic_aggregation(
|
||||||
|
resource_id=vm_uuid,
|
||||||
|
meter_name=meter,
|
||||||
|
period="120",
|
||||||
|
aggregate='min'
|
||||||
|
)
|
||||||
|
if avg_meter is None:
|
||||||
|
raise exception.NoMetricValuesForVM(resource_id=vm_uuid,
|
||||||
|
metric_name=meter)
|
||||||
|
vm_load[meter] = avg_meter
|
||||||
|
return vm_load
|
||||||
|
|
||||||
|
def normalize_hosts_load(self, hosts, current_model):
|
||||||
|
normalized_hosts = deepcopy(hosts)
|
||||||
|
for host in normalized_hosts:
|
||||||
|
if 'cpu_util' in normalized_hosts[host]:
|
||||||
|
normalized_hosts[host]['cpu_util'] /= float(100)
|
||||||
|
|
||||||
|
if 'memory.resident' in normalized_hosts[host]:
|
||||||
|
h_memory = current_model.get_resource_from_id(
|
||||||
|
resource.ResourceType.memory).get_capacity(
|
||||||
|
current_model.get_hypervisor_from_id(host))
|
||||||
|
normalized_hosts[host]['memory.resident'] /= float(h_memory)
|
||||||
|
|
||||||
|
return normalized_hosts
|
||||||
|
|
||||||
|
def get_hosts_load(self, current_model):
|
||||||
|
"""Get load of every host by gathering vms load"""
|
||||||
|
hosts_load = {}
|
||||||
|
for hypervisor_id in current_model.get_all_hypervisors():
|
||||||
|
hosts_load[hypervisor_id] = {}
|
||||||
|
host_vcpus = current_model.get_resource_from_id(
|
||||||
|
resource.ResourceType.cpu_cores).get_capacity(
|
||||||
|
current_model.get_hypervisor_from_id(hypervisor_id))
|
||||||
|
hosts_load[hypervisor_id]['vcpus'] = host_vcpus
|
||||||
|
|
||||||
|
for metric in self.metrics:
|
||||||
|
avg_meter = self.ceilometer.statistic_aggregation(
|
||||||
|
resource_id=hypervisor_id,
|
||||||
|
meter_name=vm_host_measures[metric],
|
||||||
|
period="60",
|
||||||
|
aggregate='avg'
|
||||||
|
)
|
||||||
|
if avg_meter is None:
|
||||||
|
raise exception.NoSuchMetricForHost(
|
||||||
|
metric=vm_host_measures[metric],
|
||||||
|
host=hypervisor_id)
|
||||||
|
hosts_load[hypervisor_id][metric] = avg_meter
|
||||||
|
return hosts_load
|
||||||
|
|
||||||
|
def get_sd(self, hosts, meter_name):
|
||||||
|
"""Get standard deviation among hosts by specified meter"""
|
||||||
|
mean = 0
|
||||||
|
variaton = 0
|
||||||
|
for host_id in hosts:
|
||||||
|
mean += hosts[host_id][meter_name]
|
||||||
|
mean /= len(hosts)
|
||||||
|
for host_id in hosts:
|
||||||
|
variaton += (hosts[host_id][meter_name] - mean) ** 2
|
||||||
|
variaton /= len(hosts)
|
||||||
|
sd = math.sqrt(variaton)
|
||||||
|
return sd
|
||||||
|
|
||||||
|
def calculate_weighted_sd(self, sd_case):
|
||||||
|
"""Calculate common standard deviation among meters on host"""
|
||||||
|
weighted_sd = 0
|
||||||
|
for metric, value in zip(self.metrics, sd_case):
|
||||||
|
try:
|
||||||
|
weighted_sd += value * float(self.weights[metric + '_weight'])
|
||||||
|
except KeyError as exc:
|
||||||
|
LOG.exception(exc)
|
||||||
|
raise exception.WatcherException(
|
||||||
|
_("Incorrect mapping: could not find associated weight"
|
||||||
|
" for %s in weight dict.") % metric)
|
||||||
|
return weighted_sd
|
||||||
|
|
||||||
|
def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id,
|
||||||
|
current_model):
|
||||||
|
"""Calculate migration case
|
||||||
|
|
||||||
|
Return list of standard deviation values, that appearing in case of
|
||||||
|
migration of vm from source host to destination host
|
||||||
|
:param hosts: hosts with their workload
|
||||||
|
:param vm_id: the virtual machine
|
||||||
|
:param src_hp_id: the source hypervisor id
|
||||||
|
:param dst_hp_id: the destination hypervisor id
|
||||||
|
:param current_model: the cluster model
|
||||||
|
:return: list of standard deviation values
|
||||||
|
"""
|
||||||
|
migration_case = []
|
||||||
|
new_hosts = deepcopy(hosts)
|
||||||
|
vm_load = self.get_vm_load(vm_id, current_model)
|
||||||
|
d_host_vcpus = new_hosts[dst_hp_id]['vcpus']
|
||||||
|
s_host_vcpus = new_hosts[src_hp_id]['vcpus']
|
||||||
|
for metric in self.metrics:
|
||||||
|
if metric is 'cpu_util':
|
||||||
|
new_hosts[src_hp_id][metric] -= self.transform_vm_cpu(
|
||||||
|
vm_load,
|
||||||
|
s_host_vcpus)
|
||||||
|
new_hosts[dst_hp_id][metric] += self.transform_vm_cpu(
|
||||||
|
vm_load,
|
||||||
|
d_host_vcpus)
|
||||||
|
else:
|
||||||
|
new_hosts[src_hp_id][metric] -= vm_load[metric]
|
||||||
|
new_hosts[dst_hp_id][metric] += vm_load[metric]
|
||||||
|
normalized_hosts = self.normalize_hosts_load(new_hosts, current_model)
|
||||||
|
for metric in self.metrics:
|
||||||
|
migration_case.append(self.get_sd(normalized_hosts, metric))
|
||||||
|
migration_case.append(new_hosts)
|
||||||
|
return migration_case
|
||||||
|
|
||||||
|
def simulate_migrations(self, current_model, hosts):
|
||||||
|
"""Make sorted list of pairs vm:dst_host"""
|
||||||
|
def yield_hypervisors(hypervisors):
|
||||||
|
ct = CONF['watcher_strategies.workload_stabilization'].retry_count
|
||||||
|
if self.host_choice == 'cycle':
|
||||||
|
for i in itertools.cycle(hypervisors):
|
||||||
|
yield [i]
|
||||||
|
if self.host_choice == 'retry':
|
||||||
|
while True:
|
||||||
|
yield random.sample(hypervisors, ct)
|
||||||
|
if self.host_choice == 'fullsearch':
|
||||||
|
while True:
|
||||||
|
yield hypervisors
|
||||||
|
|
||||||
|
vm_host_map = []
|
||||||
|
for source_hp_id in current_model.get_all_hypervisors():
|
||||||
|
hypervisors = list(current_model.get_all_hypervisors())
|
||||||
|
hypervisors.remove(source_hp_id)
|
||||||
|
hypervisor_list = yield_hypervisors(hypervisors)
|
||||||
|
vms_id = current_model.get_mapping(). \
|
||||||
|
get_node_vms_from_id(source_hp_id)
|
||||||
|
for vm_id in vms_id:
|
||||||
|
min_sd_case = {'value': len(self.metrics)}
|
||||||
|
vm = current_model.get_vm_from_id(vm_id)
|
||||||
|
if vm.state not in [vm_state.VMState.ACTIVE.value,
|
||||||
|
vm_state.VMState.PAUSED.value]:
|
||||||
|
continue
|
||||||
|
for dst_hp_id in next(hypervisor_list):
|
||||||
|
sd_case = self.calculate_migration_case(hosts, vm_id,
|
||||||
|
source_hp_id,
|
||||||
|
dst_hp_id,
|
||||||
|
current_model)
|
||||||
|
|
||||||
|
weighted_sd = self.calculate_weighted_sd(sd_case[:-1])
|
||||||
|
|
||||||
|
if weighted_sd < min_sd_case['value']:
|
||||||
|
min_sd_case = {'host': dst_hp_id, 'value': weighted_sd,
|
||||||
|
's_host': source_hp_id, 'vm': vm_id}
|
||||||
|
vm_host_map.append(min_sd_case)
|
||||||
|
break
|
||||||
|
return sorted(vm_host_map, key=lambda x: x['value'])
|
||||||
|
|
||||||
|
def check_threshold(self, current_model):
|
||||||
|
"""Check if cluster is needed in balancing"""
|
||||||
|
hosts_load = self.get_hosts_load(current_model)
|
||||||
|
normalized_load = self.normalize_hosts_load(hosts_load, current_model)
|
||||||
|
for metric in self.metrics:
|
||||||
|
metric_sd = self.get_sd(normalized_load, metric)
|
||||||
|
if metric_sd > float(self.thresholds[metric]):
|
||||||
|
return self.simulate_migrations(current_model, hosts_load)
|
||||||
|
|
||||||
|
def add_migration(self,
|
||||||
|
resource_id,
|
||||||
|
migration_type,
|
||||||
|
src_hypervisor,
|
||||||
|
dst_hypervisor):
|
||||||
|
parameters = {'migration_type': migration_type,
|
||||||
|
'src_hypervisor': src_hypervisor,
|
||||||
|
'dst_hypervisor': dst_hypervisor}
|
||||||
|
self.solution.add_action(action_type=self.MIGRATION,
|
||||||
|
resource_id=resource_id,
|
||||||
|
input_parameters=parameters)
|
||||||
|
|
||||||
|
def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor,
|
||||||
|
mig_dst_hypervisor):
|
||||||
|
"""Create migration VM """
|
||||||
|
if current_model.get_mapping().migrate_vm(
|
||||||
|
mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
|
||||||
|
self.add_migration(mig_vm.uuid, 'live',
|
||||||
|
mig_src_hypervisor.uuid,
|
||||||
|
mig_dst_hypervisor.uuid)
|
||||||
|
|
||||||
|
def migrate(self, current_model, vm_uuid, src_host, dst_host):
|
||||||
|
mig_vm = current_model.get_vm_from_id(vm_uuid)
|
||||||
|
mig_src_hypervisor = current_model.get_hypervisor_from_id(src_host)
|
||||||
|
mig_dst_hypervisor = current_model.get_hypervisor_from_id(dst_host)
|
||||||
|
self.create_migration_vm(current_model, mig_vm, mig_src_hypervisor,
|
||||||
|
mig_dst_hypervisor)
|
||||||
|
|
||||||
|
def fill_solution(self, current_model):
|
||||||
|
self.solution.model = current_model
|
||||||
|
self.solution.efficacy = 100
|
||||||
|
return self.solution
|
||||||
|
|
||||||
|
def execute(self, orign_model):
|
||||||
|
LOG.info(_LI("Initializing Workload Stabilization"))
|
||||||
|
current_model = orign_model
|
||||||
|
|
||||||
|
if orign_model is None:
|
||||||
|
raise exception.ClusterStateNotDefined()
|
||||||
|
|
||||||
|
migration = self.check_threshold(current_model)
|
||||||
|
if migration:
|
||||||
|
hosts_load = self.get_hosts_load(current_model)
|
||||||
|
min_sd = 1
|
||||||
|
balanced = False
|
||||||
|
for vm_host in migration:
|
||||||
|
dst_hp_disk = current_model.get_resource_from_id(
|
||||||
|
resource.ResourceType.disk).get_capacity(
|
||||||
|
current_model.get_hypervisor_from_id(vm_host['host']))
|
||||||
|
vm_disk = current_model.get_resource_from_id(
|
||||||
|
resource.ResourceType.disk).get_capacity(
|
||||||
|
current_model.get_vm_from_id(vm_host['vm']))
|
||||||
|
if vm_disk > dst_hp_disk:
|
||||||
|
continue
|
||||||
|
vm_load = self.calculate_migration_case(hosts_load,
|
||||||
|
vm_host['vm'],
|
||||||
|
vm_host['s_host'],
|
||||||
|
vm_host['host'],
|
||||||
|
current_model)
|
||||||
|
weighted_sd = self.calculate_weighted_sd(vm_load[:-1])
|
||||||
|
if weighted_sd < min_sd:
|
||||||
|
min_sd = weighted_sd
|
||||||
|
hosts_load = vm_load[-1]
|
||||||
|
self.migrate(current_model, vm_host['vm'],
|
||||||
|
vm_host['s_host'], vm_host['host'])
|
||||||
|
|
||||||
|
for metric, value in zip(self.metrics, vm_load[:-1]):
|
||||||
|
if value < float(self.thresholds[metric]):
|
||||||
|
balanced = True
|
||||||
|
break
|
||||||
|
if balanced:
|
||||||
|
break
|
||||||
|
return self.fill_solution(current_model)
|
||||||
@@ -30,10 +30,16 @@ class FakerMetricsCollector(object):
|
|||||||
def mock_get_statistics(self, resource_id, meter_name, period,
|
def mock_get_statistics(self, resource_id, meter_name, period,
|
||||||
aggregate='avg'):
|
aggregate='avg'):
|
||||||
result = 0
|
result = 0
|
||||||
if meter_name == "compute.node.cpu.percent":
|
if meter_name == "hardware.cpu.util":
|
||||||
result = self.get_usage_node_cpu(resource_id)
|
result = self.get_usage_node_cpu(resource_id)
|
||||||
|
elif meter_name == "compute.node.cpu.percent":
|
||||||
|
result = self.get_usage_node_cpu(resource_id)
|
||||||
|
elif meter_name == "hardware.memory.used":
|
||||||
|
result = self.get_usage_node_ram(resource_id)
|
||||||
elif meter_name == "cpu_util":
|
elif meter_name == "cpu_util":
|
||||||
result = self.get_average_usage_vm_cpu(resource_id)
|
result = self.get_average_usage_vm_cpu(resource_id)
|
||||||
|
elif meter_name == "memory.resident":
|
||||||
|
result = self.get_average_usage_vm_memory(resource_id)
|
||||||
elif meter_name == "hardware.ipmi.node.outlet_temperature":
|
elif meter_name == "hardware.ipmi.node.outlet_temperature":
|
||||||
result = self.get_average_outlet_temperature(resource_id)
|
result = self.get_average_outlet_temperature(resource_id)
|
||||||
return result
|
return result
|
||||||
@@ -49,6 +55,20 @@ class FakerMetricsCollector(object):
|
|||||||
|
|
||||||
return mock[str(uuid)]
|
return mock[str(uuid)]
|
||||||
|
|
||||||
|
def get_usage_node_ram(self, uuid):
|
||||||
|
mock = {}
|
||||||
|
mock['Node_0'] = 7
|
||||||
|
mock['Node_1'] = 5
|
||||||
|
mock['Node_2'] = 29
|
||||||
|
mock['Node_3'] = 8
|
||||||
|
mock['Node_4'] = 4
|
||||||
|
|
||||||
|
if uuid not in mock.keys():
|
||||||
|
# mock[uuid] = random.randint(1, 4)
|
||||||
|
mock[uuid] = 8
|
||||||
|
|
||||||
|
return float(mock[str(uuid)])
|
||||||
|
|
||||||
def get_usage_node_cpu(self, uuid):
|
def get_usage_node_cpu(self, uuid):
|
||||||
"""The last VM CPU usage values to average
|
"""The last VM CPU usage values to average
|
||||||
|
|
||||||
@@ -77,6 +97,12 @@ class FakerMetricsCollector(object):
|
|||||||
# node 4
|
# node 4
|
||||||
mock['VM_7_hostname_7'] = 4
|
mock['VM_7_hostname_7'] = 4
|
||||||
|
|
||||||
|
mock['Node_0'] = 7
|
||||||
|
mock['Node_1'] = 5
|
||||||
|
mock['Node_2'] = 10
|
||||||
|
mock['Node_3'] = 4
|
||||||
|
mock['Node_4'] = 2
|
||||||
|
|
||||||
if uuid not in mock.keys():
|
if uuid not in mock.keys():
|
||||||
# mock[uuid] = random.randint(1, 4)
|
# mock[uuid] = random.randint(1, 4)
|
||||||
mock[uuid] = 8
|
mock[uuid] = 8
|
||||||
|
|||||||
@@ -0,0 +1,168 @@
|
|||||||
|
# -*- encoding: utf-8 -*-
|
||||||
|
# Copyright (c) 2016 Servionica LLC
|
||||||
|
#
|
||||||
|
# Authors: Alexander Chadin <a.chadin@servionica.ru>
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from watcher.decision_engine.strategy import strategies
|
||||||
|
from watcher.tests import base
|
||||||
|
from watcher.tests.decision_engine.strategy.strategies \
|
||||||
|
import faker_cluster_state
|
||||||
|
from watcher.tests.decision_engine.strategy.strategies \
|
||||||
|
import faker_metrics_collector
|
||||||
|
|
||||||
|
|
||||||
|
class TestWorkloadStabilization(base.BaseTestCase):
|
||||||
|
# fake metrics
|
||||||
|
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
|
||||||
|
|
||||||
|
# fake cluster
|
||||||
|
fake_cluster = faker_cluster_state.FakerModelCollector()
|
||||||
|
|
||||||
|
hosts_load_assert = {'Node_0':
|
||||||
|
{'cpu_util': 7.0, 'memory.resident': 7.0,
|
||||||
|
'vcpus': 40},
|
||||||
|
'Node_1':
|
||||||
|
{'cpu_util': 5.0, 'memory.resident': 5,
|
||||||
|
'vcpus': 40},
|
||||||
|
'Node_2':
|
||||||
|
{'cpu_util': 10.0, 'memory.resident': 29,
|
||||||
|
'vcpus': 40},
|
||||||
|
'Node_3':
|
||||||
|
{'cpu_util': 4.0, 'memory.resident': 8,
|
||||||
|
'vcpus': 40},
|
||||||
|
'Node_4':
|
||||||
|
{'cpu_util': 2.0, 'memory.resident': 4,
|
||||||
|
'vcpus': 40}}
|
||||||
|
|
||||||
|
def test_get_vm_load(self):
|
||||||
|
model = self.fake_cluster.generate_scenario_1()
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
vm_0_dict = {'uuid': 'VM_0', 'vcpus': 10,
|
||||||
|
'cpu_util': 7, 'memory.resident': 2}
|
||||||
|
self.assertEqual(sd.get_vm_load("VM_0", model), vm_0_dict)
|
||||||
|
|
||||||
|
def test_normalize_hosts_load(self):
|
||||||
|
model = self.fake_cluster.generate_scenario_1()
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
fake_hosts = {'Node_0': {'cpu_util': 7.0, 'memory.resident': 7},
|
||||||
|
'Node_1': {'cpu_util': 5.0, 'memory.resident': 5}}
|
||||||
|
normalized_hosts = {'Node_0':
|
||||||
|
{'cpu_util': 0.07,
|
||||||
|
'memory.resident': 0.05303030303030303},
|
||||||
|
'Node_1':
|
||||||
|
{'cpu_util': 0.05,
|
||||||
|
'memory.resident': 0.03787878787878788}}
|
||||||
|
self.assertEqual(sd.normalize_hosts_load(fake_hosts, model),
|
||||||
|
normalized_hosts)
|
||||||
|
|
||||||
|
def test_get_hosts_load(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
self.assertEqual(
|
||||||
|
sd.get_hosts_load(self.fake_cluster.generate_scenario_1()),
|
||||||
|
self.hosts_load_assert)
|
||||||
|
|
||||||
|
def test_get_sd(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
test_cpu_sd = 2.7
|
||||||
|
test_ram_sd = 9.3
|
||||||
|
self.assertEqual(
|
||||||
|
round(sd.get_sd(self.hosts_load_assert, 'cpu_util'), 1),
|
||||||
|
test_cpu_sd)
|
||||||
|
self.assertEqual(
|
||||||
|
round(sd.get_sd(self.hosts_load_assert, 'memory.resident'), 1),
|
||||||
|
test_ram_sd)
|
||||||
|
|
||||||
|
def test_calculate_weighted_sd(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
sd_case = [0.5, 0.75]
|
||||||
|
self.assertEqual(sd.calculate_weighted_sd(sd_case), 1.25)
|
||||||
|
|
||||||
|
def test_calculate_migration_case(self):
|
||||||
|
model = self.fake_cluster.generate_scenario_1()
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
self.assertEqual(sd.calculate_migration_case(
|
||||||
|
self.hosts_load_assert, "VM_5", "Node_2", "Node_1",
|
||||||
|
model)[-1]["Node_1"],
|
||||||
|
{'cpu_util': 7.5, 'memory.resident': 21, 'vcpus': 40})
|
||||||
|
|
||||||
|
def test_simulate_migrations(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
sd.host_choice = 'retry'
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
self.assertEqual(
|
||||||
|
len(sd.simulate_migrations(self.fake_cluster.generate_scenario_1(),
|
||||||
|
self.hosts_load_assert)),
|
||||||
|
8)
|
||||||
|
|
||||||
|
def test_check_threshold(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
sd.simulate_migrations = mock.Mock(return_value=True)
|
||||||
|
self.assertTrue(
|
||||||
|
sd.check_threshold(self.fake_cluster.generate_scenario_1()))
|
||||||
|
|
||||||
|
def test_execute_one_migration(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
model = self.fake_cluster.generate_scenario_1()
|
||||||
|
sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4',
|
||||||
|
's_host': 'Node_2',
|
||||||
|
'host': 'Node_1'}])
|
||||||
|
with mock.patch.object(sd, 'migrate') as mock_migration:
|
||||||
|
sd.execute(model)
|
||||||
|
mock_migration.assert_called_once_with(model, 'VM_4', 'Node_2',
|
||||||
|
'Node_1')
|
||||||
|
|
||||||
|
def test_execute_multiply_migrations(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
model = self.fake_cluster.generate_scenario_1()
|
||||||
|
sd.thresholds = {'cpu_util': 0.022, 'memory.resident': 0.0001}
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4',
|
||||||
|
's_host': 'Node_2',
|
||||||
|
'host': 'Node_1'},
|
||||||
|
{'vm': 'VM_3',
|
||||||
|
's_host': 'Node_2',
|
||||||
|
'host': 'Node_3'}])
|
||||||
|
with mock.patch.object(sd, 'migrate') as mock_migrate:
|
||||||
|
sd.execute(model)
|
||||||
|
self.assertEqual(mock_migrate.call_count, 2)
|
||||||
|
|
||||||
|
def test_execute_nothing_to_migrate(self):
|
||||||
|
sd = strategies.WorkloadStabilization()
|
||||||
|
model = self.fake_cluster.generate_scenario_1()
|
||||||
|
sd.thresholds = {'cpu_util': 0.042, 'memory.resident': 0.0001}
|
||||||
|
sd.ceilometer = mock.MagicMock(
|
||||||
|
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||||
|
sd.simulate_migrations = mock.Mock(return_value=False)
|
||||||
|
with mock.patch.object(sd, 'migrate') as mock_migrate:
|
||||||
|
sd.execute(model)
|
||||||
|
mock_migrate.assert_not_called()
|
||||||
Reference in New Issue
Block a user