Merge "Added pre/post execution methods to strategies"

This commit is contained in:
Jenkins
2016-06-15 08:59:58 +00:00
committed by Gerrit Code Review
18 changed files with 764 additions and 756 deletions

View File

@@ -18,7 +18,6 @@ from oslo_log import log
from watcher.common import clients
from watcher.decision_engine.strategy.context import base
from watcher.decision_engine.strategy.selection import default
from watcher.metrics_engine.cluster_model_collector import manager
from watcher import objects
@@ -30,11 +29,6 @@ class DefaultStrategyContext(base.BaseStrategyContext):
def __init__(self):
super(DefaultStrategyContext, self).__init__()
LOG.debug("Initializing Strategy Context")
self._collector_manager = manager.CollectorManager()
@property
def collector(self):
return self._collector_manager
def execute_strategy(self, audit_uuid, request_context):
audit = objects.Audit.get_by_uuid(request_context, audit_uuid)
@@ -46,10 +40,6 @@ class DefaultStrategyContext(base.BaseStrategyContext):
osc = clients.OpenStackClients()
# todo(jed) retrieve in audit_template parameters (threshold,...)
# todo(jed) create ActionPlan
collector_manager = self.collector.get_cluster_model_collector(osc=osc)
# todo(jed) remove call to get_latest_cluster_data_model
cluster_data_model = collector_manager.get_latest_cluster_data_model()
strategy_selector = default.DefaultStrategySelector(
goal_name=objects.Goal.get_by_id(
@@ -59,5 +49,4 @@ class DefaultStrategyContext(base.BaseStrategyContext):
selected_strategy = strategy_selector.select()
# todo(jed) add parameters and remove cluster_data_model
return selected_strategy.execute(cluster_data_model)
return selected_strategy.execute()

View File

@@ -20,14 +20,16 @@ from watcher.decision_engine.strategy.strategies import dummy_strategy
from watcher.decision_engine.strategy.strategies import outlet_temp_control
from watcher.decision_engine.strategy.strategies import \
vm_workload_consolidation
from watcher.decision_engine.strategy.strategies import workload_balance
from watcher.decision_engine.strategy.strategies import workload_stabilization
BasicConsolidation = basic_consolidation.BasicConsolidation
OutletTempControl = outlet_temp_control.OutletTempControl
DummyStrategy = dummy_strategy.DummyStrategy
VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation
WorkloadBalance = workload_balance.WorkloadBalance
WorkloadStabilization = workload_stabilization.WorkloadStabilization
__all__ = ("BasicConsolidation", "OutletTempControl",
"DummyStrategy", "VMWorkloadConsolidation",
__all__ = ("BasicConsolidation", "OutletTempControl", "DummyStrategy",
"VMWorkloadConsolidation", "WorkloadBalance",
"WorkloadStabilization")

View File

@@ -39,12 +39,12 @@ which are dynamically loaded by Watcher at launch time.
import abc
import six
from watcher._i18n import _
from watcher.common import clients
from watcher.common.loader import loadable
from watcher.decision_engine.loading import default as loading
from watcher.decision_engine.solution import default
from watcher.decision_engine.strategy.common import level
from watcher.metrics_engine.cluster_model_collector import manager
@six.add_metaclass(abc.ABCMeta)
@@ -56,7 +56,13 @@ class BaseStrategy(loadable.Loadable):
"""
def __init__(self, config, osc=None):
""":param osc: an OpenStackClients instance"""
"""Constructor: the signature should be identical within the subclasses
:param config: Configuration related to this plugin
:type config: :py:class:`~.Struct`
:param osc: An OpenStackClients instance
:type osc: :py:class:`~.OpenStackClients` instance
"""
super(BaseStrategy, self).__init__(config)
self._name = self.get_name()
self._display_name = self.get_display_name()
@@ -66,6 +72,9 @@ class BaseStrategy(loadable.Loadable):
# the solution given by the strategy
self._solution = default.DefaultSolution()
self._osc = osc
self._collector_manager = None
self._model = None
self._goal = None
@classmethod
@abc.abstractmethod
@@ -109,14 +118,60 @@ class BaseStrategy(loadable.Loadable):
return []
@abc.abstractmethod
def execute(self, original_model):
def pre_execute(self):
"""Pre-execution phase
This can be used to fetch some pre-requisites or data.
"""
raise NotImplementedError()
@abc.abstractmethod
def do_execute(self):
"""Strategy execution phase
This phase is where you should put the main logic of your strategy.
"""
raise NotImplementedError()
@abc.abstractmethod
def post_execute(self):
"""Post-execution phase
This can be used to compute the global efficacy
"""
raise NotImplementedError()
def execute(self):
"""Execute a strategy
:param original_model: The model the strategy is executed on
:type model: str
:return: A computed solution (via a placement algorithm)
:rtype: :class:`watcher.decision_engine.solution.base.BaseSolution`
:rtype: :py:class:`~.BaseSolution` instance
"""
self.pre_execute()
self.do_execute()
self.post_execute()
return self.solution
@property
def collector(self):
if self._collector_manager is None:
self._collector_manager = manager.CollectorManager()
return self._collector_manager
@property
def model(self):
"""Cluster data model
:returns: Cluster data model the strategy is executed on
:rtype model: :py:class:`~.ModelRoot` instance
"""
if self._model is None:
collector = self.collector.get_cluster_model_collector(
osc=self.osc)
self._model = collector.get_latest_cluster_data_model()
return self._model
@property
def osc(self):
@@ -140,6 +195,10 @@ class BaseStrategy(loadable.Loadable):
def display_name(self):
return self._display_name
@property
def goal(self):
return self._goal
@property
def strategy_level(self):
return self._strategy_level
@@ -202,11 +261,3 @@ class WorkloadStabilizationBaseStrategy(BaseStrategy):
@classmethod
def get_goal_name(cls):
return "workload_balancing"
@classmethod
def get_goal_display_name(cls):
return _("Workload balancing")
@classmethod
def get_translatable_goal_display_name(cls):
return "Workload balancing"

View File

@@ -71,11 +71,11 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
MIGRATION = "migrate"
CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state"
def __init__(self, config=None, osc=None):
def __init__(self, config, osc=None):
"""Basic offline Consolidation using live migration
:param config: A mapping containing the configuration of this strategy
:type config: dict
:type config: :py:class:`~.Struct` instance
:param osc: :py:class:`~.OpenStackClients` instance
"""
super(BasicConsolidation, self).__init__(config, osc)
@@ -134,17 +134,14 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
def compute_attempts(self, size_cluster):
"""Upper bound of the number of migration
:param size_cluster:
:param size_cluster: The size of the cluster
"""
self.migration_attempts = size_cluster * self.bound_migration
def check_migration(self, cluster_data_model,
src_hypervisor,
dest_hypervisor,
vm_to_mig):
"""check if the migration is possible
def check_migration(self, src_hypervisor, dest_hypervisor, vm_to_mig):
"""Check if the migration is possible
:param cluster_data_model: the current state of the cluster
:param self.model: the current state of the cluster
:param src_hypervisor: the current node of the virtual machine
:param dest_hypervisor: the destination of the virtual machine
:param vm_to_mig: the virtual machine
@@ -153,24 +150,22 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
if src_hypervisor == dest_hypervisor:
return False
LOG.debug('Migrate VM {0} from {1} to {2} '.format(vm_to_mig,
src_hypervisor,
dest_hypervisor,
))
LOG.debug('Migrate VM %s from %s to %s',
vm_to_mig, src_hypervisor, dest_hypervisor)
total_cores = 0
total_disk = 0
total_mem = 0
cpu_capacity = cluster_data_model.get_resource_from_id(
cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores)
disk_capacity = cluster_data_model.get_resource_from_id(
disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk)
memory_capacity = cluster_data_model.get_resource_from_id(
memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory)
for vm_id in cluster_data_model. \
for vm_id in self.model. \
get_mapping().get_node_vms(dest_hypervisor):
vm = cluster_data_model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
total_cores += cpu_capacity.get_capacity(vm)
total_disk += disk_capacity.get_capacity(vm)
total_mem += memory_capacity.get_capacity(vm)
@@ -180,42 +175,33 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
total_disk += disk_capacity.get_capacity(vm_to_mig)
total_mem += memory_capacity.get_capacity(vm_to_mig)
return self.check_threshold(cluster_data_model,
dest_hypervisor,
total_cores,
total_disk,
return self.check_threshold(dest_hypervisor, total_cores, total_disk,
total_mem)
def check_threshold(self, cluster_data_model,
dest_hypervisor,
total_cores,
total_disk,
total_mem):
def check_threshold(self, dest_hypervisor, total_cores,
total_disk, total_mem):
"""Check threshold
check the threshold value defined by the ratio of
aggregated CPU capacity of VMs on one node to CPU capacity
of this node must not exceed the threshold value.
:param cluster_data_model: the current state of the cluster
:param self.model: the current state of the cluster
:param dest_hypervisor: the destination of the virtual machine
:param total_cores
:param total_disk
:param total_mem
:return: True if the threshold is not exceed
"""
cpu_capacity = cluster_data_model.get_resource_from_id(
cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor)
disk_capacity = cluster_data_model.get_resource_from_id(
disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(dest_hypervisor)
memory_capacity = cluster_data_model.get_resource_from_id(
memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(dest_hypervisor)
if (cpu_capacity >= total_cores * self.threshold_cores and
return (cpu_capacity >= total_cores * self.threshold_cores and
disk_capacity >= total_disk * self.threshold_disk and
memory_capacity >= total_mem * self.threshold_mem):
return True
else:
return False
memory_capacity >= total_mem * self.threshold_mem)
def get_allowed_migration_attempts(self):
"""Allowed migration
@@ -226,37 +212,24 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
"""
return self.migration_attempts
def get_threshold_cores(self):
return self.threshold_cores
def set_threshold_cores(self, threshold):
self.threshold_cores = threshold
def get_number_of_released_nodes(self):
return self.number_of_released_nodes
def get_number_of_migrations(self):
return self.number_of_migrations
def calculate_weight(self, cluster_data_model, element,
total_cores_used, total_disk_used,
def calculate_weight(self, element, total_cores_used, total_disk_used,
total_memory_used):
"""Calculate weight of every resource
:param cluster_data_model:
:param self.model:
:param element:
:param total_cores_used:
:param total_disk_used:
:param total_memory_used:
:return:
"""
cpu_capacity = cluster_data_model.get_resource_from_id(
cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(element)
disk_capacity = cluster_data_model.get_resource_from_id(
disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(element)
memory_capacity = cluster_data_model.get_resource_from_id(
memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(element)
score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
@@ -275,20 +248,19 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
# todo(jed) take in account weight
return (score_cores + score_disk + score_memory) / 3
def calculate_score_node(self, hypervisor, model):
"""calculate the score that represent the utilization level
def calculate_score_node(self, hypervisor):
"""Calculate the score that represent the utilization level
:param hypervisor:
:param model:
:return:
"""
:param hypervisor:
:return:
"""
resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname)
host_avg_cpu_util = self.ceilometer. \
statistic_aggregation(resource_id=resource_id,
meter_name=self.HOST_CPU_USAGE_METRIC_NAME,
period="7200",
aggregate='avg'
)
aggregate='avg')
if host_avg_cpu_util is None:
LOG.error(
_LE("No values returned by %(resource_id)s "
@@ -298,14 +270,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
)
host_avg_cpu_util = 100
cpu_capacity = model.get_resource_from_id(
cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(hypervisor)
total_cores_used = cpu_capacity * (host_avg_cpu_util / 100)
return self.calculate_weight(model, hypervisor, total_cores_used,
0,
0)
return self.calculate_weight(hypervisor, total_cores_used, 0, 0)
def calculate_migration_efficacy(self):
"""Calculate migration efficacy
@@ -319,16 +289,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
else:
return 0
def calculate_score_vm(self, vm, cluster_data_model):
def calculate_score_vm(self, vm):
"""Calculate Score of virtual machine
:param vm: the virtual machine
:param cluster_data_model: the cluster model
:param self.model: the cluster model
:return: score
"""
if cluster_data_model is None:
raise exception.ClusterStateNotDefined()
vm_cpu_utilization = self.ceilometer. \
statistic_aggregation(
resource_id=vm.uuid,
@@ -345,13 +312,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
)
vm_cpu_utilization = 100
cpu_capacity = cluster_data_model.get_resource_from_id(
cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(vm)
total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0)
return self.calculate_weight(cluster_data_model, vm,
total_cores_used, 0, 0)
return self.calculate_weight(vm, total_cores_used, 0, 0)
def add_change_service_state(self, resource_id, state):
parameters = {'state': state}
@@ -371,48 +337,47 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
resource_id=resource_id,
input_parameters=parameters)
def score_of_nodes(self, cluster_data_model, score):
def score_of_nodes(self, score):
"""Calculate score of nodes based on load by VMs"""
for hypervisor_id in cluster_data_model.get_all_hypervisors():
hypervisor = cluster_data_model. \
for hypervisor_id in self.model.get_all_hypervisors():
hypervisor = self.model. \
get_hypervisor_from_id(hypervisor_id)
count = cluster_data_model.get_mapping(). \
count = self.model.get_mapping(). \
get_node_vms_from_id(hypervisor_id)
if len(count) > 0:
result = self.calculate_score_node(hypervisor,
cluster_data_model)
result = self.calculate_score_node(hypervisor)
else:
''' the hypervisor has not VMs '''
# The hypervisor has not VMs
result = 0
if len(count) > 0:
score.append((hypervisor_id, result))
return score
def node_and_vm_score(self, sorted_score, score, current_model):
def node_and_vm_score(self, sorted_score, score):
"""Get List of VMs from Node"""
node_to_release = sorted_score[len(score) - 1][0]
vms_to_mig = current_model.get_mapping().get_node_vms_from_id(
vms_to_mig = self.model.get_mapping().get_node_vms_from_id(
node_to_release)
vm_score = []
for vm_id in vms_to_mig:
vm = current_model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
if vm.state == vm_state.VMState.ACTIVE.value:
vm_score.append(
(vm_id, self.calculate_score_vm(vm, current_model)))
(vm_id, self.calculate_score_vm(vm)))
return node_to_release, vm_score
def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor,
def create_migration_vm(self, mig_vm, mig_src_hypervisor,
mig_dst_hypervisor):
"""Create migration VM """
if current_model.get_mapping().migrate_vm(
"""Create migration VM"""
if self.model.get_mapping().migrate_vm(
mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
self.add_migration(mig_vm.uuid, 'live',
mig_src_hypervisor.uuid,
mig_dst_hypervisor.uuid)
if len(current_model.get_mapping().get_node_vms(
if len(self.model.get_mapping().get_node_vms(
mig_src_hypervisor)) == 0:
self.add_change_service_state(mig_src_hypervisor.
uuid,
@@ -420,24 +385,22 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
DISABLED.value)
self.number_of_released_nodes += 1
def calculate_num_migrations(self, sorted_vms, current_model,
node_to_release, sorted_score):
def calculate_num_migrations(self, sorted_vms, node_to_release,
sorted_score):
number_migrations = 0
for vm in sorted_vms:
for j in range(0, len(sorted_score)):
mig_vm = current_model.get_vm_from_id(vm[0])
mig_src_hypervisor = current_model.get_hypervisor_from_id(
mig_vm = self.model.get_vm_from_id(vm[0])
mig_src_hypervisor = self.model.get_hypervisor_from_id(
node_to_release)
mig_dst_hypervisor = current_model.get_hypervisor_from_id(
mig_dst_hypervisor = self.model.get_hypervisor_from_id(
sorted_score[j][0])
result = self.check_migration(current_model,
mig_src_hypervisor,
mig_dst_hypervisor, mig_vm)
result = self.check_migration(
mig_src_hypervisor, mig_dst_hypervisor, mig_vm)
if result:
self.create_migration_vm(
current_model, mig_vm,
mig_src_hypervisor, mig_dst_hypervisor)
mig_vm, mig_src_hypervisor, mig_dst_hypervisor)
number_migrations += 1
break
return number_migrations
@@ -450,28 +413,26 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
else:
return unsuccessful_migration + 1
def execute(self, original_model):
def pre_execute(self):
LOG.info(_LI("Initializing Sercon Consolidation"))
if original_model is None:
if self.model is None:
raise exception.ClusterStateNotDefined()
def do_execute(self):
# todo(jed) clone model
current_model = original_model
self.efficacy = 100
unsuccessful_migration = 0
first_migration = True
size_cluster = len(current_model.get_all_hypervisors())
size_cluster = len(self.model.get_all_hypervisors())
if size_cluster == 0:
raise exception.ClusterEmpty()
self.compute_attempts(size_cluster)
for hypervisor_id in current_model.get_all_hypervisors():
hypervisor = current_model.get_hypervisor_from_id(hypervisor_id)
count = current_model.get_mapping(). \
for hypervisor_id in self.model.get_all_hypervisors():
hypervisor = self.model.get_hypervisor_from_id(hypervisor_id)
count = self.model.get_mapping(). \
get_node_vms_from_id(hypervisor_id)
if len(count) == 0:
if hypervisor.state == hyper_state.HypervisorState.ENABLED:
@@ -487,13 +448,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
first_migration = False
score = []
score = self.score_of_nodes(current_model, score)
score = self.score_of_nodes(score)
''' sort compute nodes by Score decreasing '''''
# Sort compute nodes by Score decreasing
sorted_score = sorted(score, reverse=True, key=lambda x: (x[1]))
LOG.debug("Hypervisor(s) BFD {0}".format(sorted_score))
LOG.debug("Hypervisor(s) BFD %s", sorted_score)
''' get Node to be released '''
# Get Node to be released
if len(score) == 0:
LOG.warning(_LW(
"The workloads of the compute nodes"
@@ -501,15 +462,15 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
break
node_to_release, vm_score = self.node_and_vm_score(
sorted_score, score, current_model)
sorted_score, score)
''' sort VMs by Score '''
# Sort VMs by Score
sorted_vms = sorted(vm_score, reverse=True, key=lambda x: (x[1]))
# BFD: Best Fit Decrease
LOG.debug("VM(s) BFD {0}".format(sorted_vms))
LOG.debug("VM(s) BFD %s", sorted_vms)
migrations = self.calculate_num_migrations(
sorted_vms, current_model, node_to_release, sorted_score)
sorted_vms, node_to_release, sorted_score)
unsuccessful_migration = self.unsuccessful_migration_actualization(
migrations, unsuccessful_migration)
@@ -519,6 +480,6 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
"efficacy": self.efficacy
}
LOG.debug(infos)
self.solution.model = current_model
self.solution.efficacy = self.efficacy
return self.solution
def post_execute(self):
pass

View File

@@ -19,6 +19,7 @@
from oslo_log import log
from watcher._i18n import _
from watcher.common import exception
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
@@ -48,16 +49,11 @@ class DummyStrategy(base.DummyBaseStrategy):
NOP = "nop"
SLEEP = "sleep"
def __init__(self, config=None, osc=None):
"""Dummy Strategy implemented for demo and testing purposes
def pre_execute(self):
if self.model is None:
raise exception.ClusterStateNotDefined()
:param config: A mapping containing the configuration of this strategy
:type config: dict
:param osc: :py:class:`~.OpenStackClients` instance
"""
super(DummyStrategy, self).__init__(config, osc)
def execute(self, original_model):
def do_execute(self):
LOG.debug("Executing Dummy strategy")
parameters = {'message': 'hello World'}
self.solution.add_action(action_type=self.NOP,
@@ -69,7 +65,9 @@ class DummyStrategy(base.DummyBaseStrategy):
self.solution.add_action(action_type=self.SLEEP,
input_parameters={'duration': 5.0})
return self.solution
def post_execute(self):
pass
@classmethod
def get_name(cls):

View File

@@ -30,7 +30,7 @@ telemetries to measure thermal/workload status of server.
from oslo_log import log
from watcher._i18n import _, _LE
from watcher._i18n import _, _LE, _LI
from watcher.common import exception as wexc
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
@@ -78,7 +78,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
MIGRATION = "migrate"
def __init__(self, config=None, osc=None):
def __init__(self, config, osc=None):
"""Outlet temperature control using live migration
:param config: A mapping containing the configuration of this strategy
@@ -116,26 +116,25 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
def ceilometer(self, c):
self._ceilometer = c
def calc_used_res(self, cluster_data_model, hypervisor, cpu_capacity,
def calc_used_res(self, hypervisor, cpu_capacity,
memory_capacity, disk_capacity):
'''calculate the used vcpus, memory and disk based on VM flavors'''
vms = cluster_data_model.get_mapping().get_node_vms(hypervisor)
"""Calculate the used vcpus, memory and disk based on VM flavors"""
vms = self.model.get_mapping().get_node_vms(hypervisor)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
if len(vms) > 0:
for vm_id in vms:
vm = cluster_data_model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
vcpus_used += cpu_capacity.get_capacity(vm)
memory_mb_used += memory_capacity.get_capacity(vm)
disk_gb_used += disk_capacity.get_capacity(vm)
return vcpus_used, memory_mb_used, disk_gb_used
def group_hosts_by_outlet_temp(self, cluster_data_model):
def group_hosts_by_outlet_temp(self):
"""Group hosts based on outlet temp meters"""
hypervisors = cluster_data_model.get_all_hypervisors()
hypervisors = self.model.get_all_hypervisors()
size_cluster = len(hypervisors)
if size_cluster == 0:
raise wexc.ClusterEmpty()
@@ -143,7 +142,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
hosts_need_release = []
hosts_target = []
for hypervisor_id in hypervisors:
hypervisor = cluster_data_model.get_hypervisor_from_id(
hypervisor = self.model.get_hypervisor_from_id(
hypervisor_id)
resource_id = hypervisor.uuid
@@ -166,37 +165,35 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
hosts_target.append(hvmap)
return hosts_need_release, hosts_target
def choose_vm_to_migrate(self, cluster_data_model, hosts):
"""pick up an active vm instance to migrate from provided hosts"""
def choose_vm_to_migrate(self, hosts):
"""Pick up an active vm instance to migrate from provided hosts"""
for hvmap in hosts:
mig_src_hypervisor = hvmap['hv']
vms_of_src = cluster_data_model.get_mapping().get_node_vms(
vms_of_src = self.model.get_mapping().get_node_vms(
mig_src_hypervisor)
if len(vms_of_src) > 0:
for vm_id in vms_of_src:
try:
# select the first active VM to migrate
vm = cluster_data_model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value:
LOG.info(_LE("VM not active, skipped: %s"),
vm.uuid)
continue
return mig_src_hypervisor, vm
except wexc.InstanceNotFound as e:
LOG.info("VM not found Error: %s" % e.message)
pass
LOG.exception(e)
LOG.info(_LI("VM not found"))
return None
def filter_dest_servers(self, cluster_data_model, hosts, vm_to_migrate):
def filter_dest_servers(self, hosts, vm_to_migrate):
"""Only return hosts with sufficient available resources"""
cpu_capacity = cluster_data_model.get_resource_from_id(
cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores)
disk_capacity = cluster_data_model.get_resource_from_id(
disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk)
memory_capacity = cluster_data_model.get_resource_from_id(
memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory)
required_cores = cpu_capacity.get_capacity(vm_to_migrate)
@@ -209,8 +206,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
host = hvmap['hv']
# available
cores_used, mem_used, disk_used = self.calc_used_res(
cluster_data_model, host, cpu_capacity, memory_capacity,
disk_capacity)
host, cpu_capacity, memory_capacity, disk_capacity)
cores_available = cpu_capacity.get_capacity(host) - cores_used
disk_available = disk_capacity.get_capacity(host) - disk_used
mem_available = memory_capacity.get_capacity(host) - mem_used
@@ -221,15 +217,14 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
return dest_servers
def execute(self, original_model):
def pre_execute(self):
LOG.debug("Initializing Outlet temperature strategy")
if original_model is None:
if self.model is None:
raise wexc.ClusterStateNotDefined()
current_model = original_model
hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp(
current_model)
def do_execute(self):
hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp()
if len(hosts_need_release) == 0:
# TODO(zhenzanz): return something right if there's no hot servers
@@ -245,16 +240,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
reverse=True,
key=lambda x: (x["outlet_temp"]))
vm_to_migrate = self.choose_vm_to_migrate(current_model,
hosts_need_release)
vm_to_migrate = self.choose_vm_to_migrate(hosts_need_release)
# calculate the vm's cpu cores,memory,disk needs
if vm_to_migrate is None:
return self.solution
mig_src_hypervisor, vm_src = vm_to_migrate
dest_servers = self.filter_dest_servers(current_model,
hosts_target,
vm_src)
dest_servers = self.filter_dest_servers(hosts_target, vm_src)
# sort the filtered result by outlet temp
# pick up the lowest one as dest server
if len(dest_servers) == 0:
@@ -267,9 +259,8 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
# always use the host with lowerest outlet temperature
mig_dst_hypervisor = dest_servers[0]['hv']
# generate solution to migrate the vm to the dest server,
if current_model.get_mapping().migrate_vm(vm_src,
mig_src_hypervisor,
mig_dst_hypervisor):
if self.model.get_mapping().migrate_vm(
vm_src, mig_src_hypervisor, mig_dst_hypervisor):
parameters = {'migration_type': 'live',
'src_hypervisor': mig_src_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid}
@@ -277,6 +268,6 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
resource_id=vm_src.uuid,
input_parameters=parameters)
self.solution.model = current_model
return self.solution
def post_execute(self):
self.solution.model = self.model
# TODO(v-francoise): Add the indicators to the solution

View File

@@ -84,7 +84,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
https://github.com/openstack/watcher-specs/blob/master/specs/mitaka/implemented/zhaw-load-consolidation.rst
""" # noqa
def __init__(self, config=None, osc=None):
def __init__(self, config, osc=None):
super(VMWorkloadConsolidation, self).__init__(config, osc)
self._ceilometer = None
self.number_of_migrations = 0
@@ -121,8 +121,8 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
"""
if isinstance(state, six.string_types):
return state
elif (type(state) == hyper_state.HypervisorState or
type(state) == vm_state.VMState):
elif isinstance(state, (vm_state.VMState,
hyper_state.HypervisorState)):
return state.value
else:
LOG.error(_LE('Unexpexted resource state type, '
@@ -171,12 +171,10 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
vm_state_str = self.get_state_str(vm.state)
if vm_state_str != vm_state.VMState.ACTIVE.value:
'''
Watcher curently only supports live VM migration and block live
VM migration which both requires migrated VM to be active.
When supported, the cold migration may be used as a fallback
migration mechanism to move non active VMs.
'''
# Watcher curently only supports live VM migration and block live
# VM migration which both requires migrated VM to be active.
# When supported, the cold migration may be used as a fallback
# migration mechanism to move non active VMs.
LOG.error(_LE('Cannot live migrate: vm_uuid=%(vm_uuid)s, '
'state=%(vm_state)s.'),
vm_uuid=vm_uuid,
@@ -209,13 +207,13 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
if len(model.get_mapping().get_node_vms(hypervisor)) == 0:
self.add_action_deactivate_hypervisor(hypervisor)
def get_prediction_model(self, model):
def get_prediction_model(self):
"""Return a deepcopy of a model representing current cluster state.
:param model: model_root object
:return: model_root object
"""
return deepcopy(model)
return deepcopy(self.model)
def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a VM.
@@ -334,7 +332,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
:param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
"""
hypervisors = model.get_all_hypervisors().values()
hypervisors = self.model.get_all_hypervisors().values()
rcu = {}
counters = {}
for hypervisor in hypervisors:
@@ -452,9 +450,11 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
key=lambda x: self.get_hypervisor_utilization(x, model)['cpu'])
for hypervisor in reversed(sorted_hypervisors):
if self.is_overloaded(hypervisor, model, cc):
for vm in sorted(model.get_mapping().get_node_vms(hypervisor),
key=lambda x: self.get_vm_utilization(
x, model)['cpu']):
for vm in sorted(
model.get_mapping().get_node_vms(hypervisor),
key=lambda x: self.get_vm_utilization(
x, model)['cpu']
):
for dst_hypervisor in reversed(sorted_hypervisors):
if self.vm_fits(vm, dst_hypervisor, model, cc):
self.add_migration(vm, hypervisor,
@@ -498,7 +498,11 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
dsc -= 1
asc += 1
def execute(self, original_model):
def pre_execute(self):
if self.model is None:
raise exception.ClusterStateNotDefined()
def do_execute(self):
"""Execute strategy.
This strategy produces a solution resulting in more
@@ -513,7 +517,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
:param original_model: root_model object
"""
LOG.info(_LI('Executing Smart Strategy'))
model = self.get_prediction_model(original_model)
model = self.get_prediction_model(self.model)
rcu = self.get_relative_cluster_utilization(model)
self.ceilometer_vm_data_cache = dict()
@@ -545,4 +549,6 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
self.solution.model = model
self.solution.efficacy = rcu_after['cpu']
return self.solution
def post_execute(self):
# TODO(v-francoise): Add the indicators to the solution
pass

View File

@@ -28,7 +28,7 @@ from watcher.metrics_engine.cluster_history import ceilometer as ceil
LOG = log.getLogger(__name__)
class WorkloadBalance(base.BaseStrategy):
class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
"""[PoC]Workload balance using live migration
*Description*
@@ -68,10 +68,12 @@ class WorkloadBalance(base.BaseStrategy):
MIGRATION = "migrate"
def __init__(self, config=None, osc=None):
"""Using live migration
def __init__(self, config, osc=None):
"""Workload balance using live migration
:param osc: an OpenStackClients object
:param config: A mapping containing the configuration of this strategy
:type config: :py:class:`~.Struct` instance
:param osc: :py:class:`~.OpenStackClients` instance
"""
super(WorkloadBalance, self).__init__(config, osc)
# the migration plan will be triggered when the CPU utlization %
@@ -104,44 +106,32 @@ class WorkloadBalance(base.BaseStrategy):
def get_translatable_display_name(cls):
return "workload balance migration strategy"
@classmethod
def get_goal_name(cls):
return "WORKLOAD_OPTIMIZATION"
@classmethod
def get_goal_display_name(cls):
return _("Workload optimization")
@classmethod
def get_translatable_goal_display_name(cls):
return "Workload optimization"
def calculate_used_resource(self, model, hypervisor, cap_cores, cap_mem,
def calculate_used_resource(self, hypervisor, cap_cores, cap_mem,
cap_disk):
'''calculate the used vcpus, memory and disk based on VM flavors'''
vms = model.get_mapping().get_node_vms(hypervisor)
"""Calculate the used vcpus, memory and disk based on VM flavors"""
vms = self.model.get_mapping().get_node_vms(hypervisor)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
for vm_id in vms:
vm = model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
vcpus_used += cap_cores.get_capacity(vm)
memory_mb_used += cap_mem.get_capacity(vm)
disk_gb_used += cap_disk.get_capacity(vm)
return vcpus_used, memory_mb_used, disk_gb_used
def choose_vm_to_migrate(self, model, hosts, avg_workload, workload_cache):
"""pick up an active vm instance to migrate from provided hosts
def choose_vm_to_migrate(self, hosts, avg_workload, workload_cache):
"""Pick up an active vm instance to migrate from provided hosts
:param model: it's the origin_model passed from 'execute' function
:param hosts: the array of dict which contains hypervisor object
:param avg_workload: the average workload value of all hypervisors
:param workload_cache: the map contains vm to workload mapping
"""
for hvmap in hosts:
source_hypervisor = hvmap['hv']
source_vms = model.get_mapping().get_node_vms(source_hypervisor)
source_vms = self.model.get_mapping().get_node_vms(
source_hypervisor)
if source_vms:
delta_workload = hvmap['workload'] - avg_workload
min_delta = 1000000
@@ -149,7 +139,7 @@ class WorkloadBalance(base.BaseStrategy):
for vm_id in source_vms:
try:
# select the first active VM to migrate
vm = model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value:
LOG.debug("VM not active, skipped: %s",
vm.uuid)
@@ -161,18 +151,20 @@ class WorkloadBalance(base.BaseStrategy):
except wexc.InstanceNotFound:
LOG.error(_LE("VM not found Error: %s"), vm_id)
if instance_id:
return source_hypervisor, model.get_vm_from_id(instance_id)
return source_hypervisor, self.model.get_vm_from_id(
instance_id)
else:
LOG.info(_LI("VM not found from hypervisor: %s"),
source_hypervisor.uuid)
def filter_destination_hosts(self, model, hosts, vm_to_migrate,
def filter_destination_hosts(self, hosts, vm_to_migrate,
avg_workload, workload_cache):
'''Only return hosts with sufficient available resources'''
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cap_cores = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores)
cap_disk = self.model.get_resource_from_id(resource.ResourceType.disk)
cap_mem = self.model.get_resource_from_id(resource.ResourceType.memory)
required_cores = cap_cores.get_capacity(vm_to_migrate)
required_disk = cap_disk.get_capacity(vm_to_migrate)
@@ -186,20 +178,22 @@ class WorkloadBalance(base.BaseStrategy):
workload = hvmap['workload']
# calculate the available resources
cores_used, mem_used, disk_used = self.calculate_used_resource(
model, host, cap_cores, cap_mem, cap_disk)
host, cap_cores, cap_mem, cap_disk)
cores_available = cap_cores.get_capacity(host) - cores_used
disk_available = cap_disk.get_capacity(host) - disk_used
mem_available = cap_mem.get_capacity(host) - mem_used
if (cores_available >= required_cores and
disk_available >= required_disk and
mem_available >= required_mem and
(src_vm_workload + workload) < self.threshold / 100 *
cap_cores.get_capacity(host)):
if (
cores_available >= required_cores and
disk_available >= required_disk and
mem_available >= required_mem and
(src_vm_workload + workload) < self.threshold / 100 *
cap_cores.get_capacity(host)
):
destination_hosts.append(hvmap)
return destination_hosts
def group_hosts_by_cpu_util(self, model):
def group_hosts_by_cpu_util(self):
"""Calculate the workloads of each hypervisor
try to find out the hypervisors which have reached threshold
@@ -208,12 +202,13 @@ class WorkloadBalance(base.BaseStrategy):
and also generate the VM workload map.
"""
hypervisors = model.get_all_hypervisors()
hypervisors = self.model.get_all_hypervisors()
cluster_size = len(hypervisors)
if not hypervisors:
raise wexc.ClusterEmpty()
# get cpu cores capacity of hypervisors and vms
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_cores = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores)
overload_hosts = []
nonoverload_hosts = []
# total workload of cluster
@@ -222,19 +217,20 @@ class WorkloadBalance(base.BaseStrategy):
# use workload_cache to store the workload of VMs for reuse purpose
workload_cache = {}
for hypervisor_id in hypervisors:
hypervisor = model.get_hypervisor_from_id(hypervisor_id)
vms = model.get_mapping().get_node_vms(hypervisor)
hypervisor = self.model.get_hypervisor_from_id(hypervisor_id)
vms = self.model.get_mapping().get_node_vms(hypervisor)
hypervisor_workload = 0.0
for vm_id in vms:
vm = model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
try:
cpu_util = self.ceilometer.statistic_aggregation(
resource_id=vm_id,
meter_name=self._meter,
period=self._period,
aggregate='avg')
except Exception as e:
LOG.error(_LE("Can not get cpu_util: %s"), e.message)
except Exception as exc:
LOG.exception(exc)
LOG.error(_LE("Can not get cpu_util"))
continue
if cpu_util is None:
LOG.debug("%s: cpu_util is None", vm_id)
@@ -260,15 +256,23 @@ class WorkloadBalance(base.BaseStrategy):
return overload_hosts, nonoverload_hosts, avg_workload, workload_cache
def execute(self, origin_model):
def pre_execute(self):
"""Pre-execution phase
This can be used to fetch some pre-requisites or data.
"""
LOG.info(_LI("Initializing Workload Balance Strategy"))
if origin_model is None:
if self.model is None:
raise wexc.ClusterStateNotDefined()
current_model = origin_model
def do_execute(self):
"""Strategy execution phase
This phase is where you should put the main logic of your strategy.
"""
src_hypervisors, target_hypervisors, avg_workload, workload_cache = (
self.group_hosts_by_cpu_util(current_model))
self.group_hosts_by_cpu_util())
if not src_hypervisors:
LOG.debug("No hosts require optimization")
@@ -286,19 +290,14 @@ class WorkloadBalance(base.BaseStrategy):
reverse=True,
key=lambda x: (x[self.METER_NAME]))
vm_to_migrate = self.choose_vm_to_migrate(current_model,
src_hypervisors,
avg_workload,
workload_cache)
vm_to_migrate = self.choose_vm_to_migrate(
src_hypervisors, avg_workload, workload_cache)
if not vm_to_migrate:
return self.solution
source_hypervisor, vm_src = vm_to_migrate
# find the hosts that have enough resource for the VM to be migrated
destination_hosts = self.filter_destination_hosts(current_model,
target_hypervisors,
vm_src,
avg_workload,
workload_cache)
destination_hosts = self.filter_destination_hosts(
target_hypervisors, vm_src, avg_workload, workload_cache)
# sort the filtered result by workload
# pick up the lowest one as dest server
if not destination_hosts:
@@ -311,14 +310,18 @@ class WorkloadBalance(base.BaseStrategy):
# always use the host with lowerest CPU utilization
mig_dst_hypervisor = destination_hosts[0]['hv']
# generate solution to migrate the vm to the dest server,
if current_model.get_mapping().migrate_vm(vm_src,
source_hypervisor,
mig_dst_hypervisor):
if self.model.get_mapping().migrate_vm(vm_src, source_hypervisor,
mig_dst_hypervisor):
parameters = {'migration_type': 'live',
'src_hypervisor': source_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid}
self.solution.add_action(action_type=self.MIGRATION,
resource_id=vm_src.uuid,
input_parameters=parameters)
self.solution.model = current_model
return self.solution
def post_execute(self):
"""Post-execution phase
This can be used to compute the global efficacy
"""
self.solution.model = self.model

View File

@@ -108,7 +108,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
MIGRATION = "migrate"
MEMOIZE = _set_memoize(CONF)
def __init__(self, config=None, osc=None):
def __init__(self, config, osc=None):
super(WorkloadStabilization, self).__init__(config, osc)
self._ceilometer = None
self._nova = None
@@ -164,17 +164,16 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
return vm_load['cpu_util'] * (vm_load['vcpus'] / float(host_vcpus))
@MEMOIZE
def get_vm_load(self, vm_uuid, current_model):
def get_vm_load(self, vm_uuid):
"""Gathering vm load through ceilometer statistic.
:param vm_uuid: vm for which statistic is gathered.
:param current_model: the cluster model
:return: dict
"""
LOG.debug(_LI('get_vm_load started'))
vm_vcpus = current_model.get_resource_from_id(
vm_vcpus = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(
current_model.get_vm_from_id(vm_uuid))
self.model.get_vm_from_id(vm_uuid))
vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus}
for meter in self.metrics:
avg_meter = self.ceilometer.statistic_aggregation(
@@ -189,25 +188,25 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
vm_load[meter] = avg_meter
return vm_load
def normalize_hosts_load(self, hosts, current_model):
def normalize_hosts_load(self, hosts):
normalized_hosts = deepcopy(hosts)
for host in normalized_hosts:
if 'memory.resident' in normalized_hosts[host]:
h_memory = current_model.get_resource_from_id(
h_memory = self.model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(
current_model.get_hypervisor_from_id(host))
self.model.get_hypervisor_from_id(host))
normalized_hosts[host]['memory.resident'] /= float(h_memory)
return normalized_hosts
def get_hosts_load(self, current_model):
def get_hosts_load(self):
"""Get load of every host by gathering vms load"""
hosts_load = {}
for hypervisor_id in current_model.get_all_hypervisors():
for hypervisor_id in self.model.get_all_hypervisors():
hosts_load[hypervisor_id] = {}
host_vcpus = current_model.get_resource_from_id(
host_vcpus = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(
current_model.get_hypervisor_from_id(hypervisor_id))
self.model.get_hypervisor_from_id(hypervisor_id))
hosts_load[hypervisor_id]['vcpus'] = host_vcpus
for metric in self.metrics:
@@ -250,8 +249,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
" for %s in weight dict.") % metric)
return weighted_sd
def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id,
current_model):
def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id):
"""Calculate migration case
Return list of standard deviation values, that appearing in case of
@@ -260,12 +258,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
:param vm_id: the virtual machine
:param src_hp_id: the source hypervisor id
:param dst_hp_id: the destination hypervisor id
:param current_model: the cluster model
:return: list of standard deviation values
"""
migration_case = []
new_hosts = deepcopy(hosts)
vm_load = self.get_vm_load(vm_id, current_model)
vm_load = self.get_vm_load(vm_id)
d_host_vcpus = new_hosts[dst_hp_id]['vcpus']
s_host_vcpus = new_hosts[src_hp_id]['vcpus']
for metric in self.metrics:
@@ -279,13 +276,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
else:
new_hosts[src_hp_id][metric] -= vm_load[metric]
new_hosts[dst_hp_id][metric] += vm_load[metric]
normalized_hosts = self.normalize_hosts_load(new_hosts, current_model)
normalized_hosts = self.normalize_hosts_load(new_hosts)
for metric in self.metrics:
migration_case.append(self.get_sd(normalized_hosts, metric))
migration_case.append(new_hosts)
return migration_case
def simulate_migrations(self, current_model, hosts):
def simulate_migrations(self, hosts):
"""Make sorted list of pairs vm:dst_host"""
def yield_hypervisors(hypervisors):
ct = CONF['watcher_strategies.workload_stabilization'].retry_count
@@ -300,23 +297,22 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
yield hypervisors
vm_host_map = []
for source_hp_id in current_model.get_all_hypervisors():
hypervisors = list(current_model.get_all_hypervisors())
for source_hp_id in self.model.get_all_hypervisors():
hypervisors = list(self.model.get_all_hypervisors())
hypervisors.remove(source_hp_id)
hypervisor_list = yield_hypervisors(hypervisors)
vms_id = current_model.get_mapping(). \
vms_id = self.model.get_mapping(). \
get_node_vms_from_id(source_hp_id)
for vm_id in vms_id:
min_sd_case = {'value': len(self.metrics)}
vm = current_model.get_vm_from_id(vm_id)
vm = self.model.get_vm_from_id(vm_id)
if vm.state not in [vm_state.VMState.ACTIVE.value,
vm_state.VMState.PAUSED.value]:
continue
for dst_hp_id in next(hypervisor_list):
sd_case = self.calculate_migration_case(hosts, vm_id,
source_hp_id,
dst_hp_id,
current_model)
dst_hp_id)
weighted_sd = self.calculate_weighted_sd(sd_case[:-1])
@@ -327,14 +323,14 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
break
return sorted(vm_host_map, key=lambda x: x['value'])
def check_threshold(self, current_model):
def check_threshold(self):
"""Check if cluster is needed in balancing"""
hosts_load = self.get_hosts_load(current_model)
normalized_load = self.normalize_hosts_load(hosts_load, current_model)
hosts_load = self.get_hosts_load()
normalized_load = self.normalize_hosts_load(hosts_load)
for metric in self.metrics:
metric_sd = self.get_sd(normalized_load, metric)
if metric_sd > float(self.thresholds[metric]):
return self.simulate_migrations(current_model, hosts_load)
return self.simulate_migrations(hosts_load)
def add_migration(self,
resource_id,
@@ -348,58 +344,57 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
resource_id=resource_id,
input_parameters=parameters)
def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor,
def create_migration_vm(self, mig_vm, mig_src_hypervisor,
mig_dst_hypervisor):
"""Create migration VM """
if current_model.get_mapping().migrate_vm(
if self.model.get_mapping().migrate_vm(
mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
self.add_migration(mig_vm.uuid, 'live',
mig_src_hypervisor.uuid,
mig_dst_hypervisor.uuid)
def migrate(self, current_model, vm_uuid, src_host, dst_host):
mig_vm = current_model.get_vm_from_id(vm_uuid)
mig_src_hypervisor = current_model.get_hypervisor_from_id(src_host)
mig_dst_hypervisor = current_model.get_hypervisor_from_id(dst_host)
self.create_migration_vm(current_model, mig_vm, mig_src_hypervisor,
def migrate(self, vm_uuid, src_host, dst_host):
mig_vm = self.model.get_vm_from_id(vm_uuid)
mig_src_hypervisor = self.model.get_hypervisor_from_id(src_host)
mig_dst_hypervisor = self.model.get_hypervisor_from_id(dst_host)
self.create_migration_vm(mig_vm, mig_src_hypervisor,
mig_dst_hypervisor)
def fill_solution(self, current_model):
self.solution.model = current_model
def fill_solution(self):
self.solution.model = self.model
self.solution.efficacy = 100
return self.solution
def execute(self, orign_model):
def pre_execute(self):
LOG.info(_LI("Initializing Workload Stabilization"))
current_model = orign_model
if orign_model is None:
if self.model is None:
raise exception.ClusterStateNotDefined()
migration = self.check_threshold(current_model)
def do_execute(self):
migration = self.check_threshold()
if migration:
hosts_load = self.get_hosts_load(current_model)
hosts_load = self.get_hosts_load()
min_sd = 1
balanced = False
for vm_host in migration:
dst_hp_disk = current_model.get_resource_from_id(
dst_hp_disk = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(
current_model.get_hypervisor_from_id(vm_host['host']))
vm_disk = current_model.get_resource_from_id(
self.model.get_hypervisor_from_id(vm_host['host']))
vm_disk = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(
current_model.get_vm_from_id(vm_host['vm']))
self.model.get_vm_from_id(vm_host['vm']))
if vm_disk > dst_hp_disk:
continue
vm_load = self.calculate_migration_case(hosts_load,
vm_host['vm'],
vm_host['s_host'],
vm_host['host'],
current_model)
vm_host['host'])
weighted_sd = self.calculate_weighted_sd(vm_load[:-1])
if weighted_sd < min_sd:
min_sd = weighted_sd
hosts_load = vm_load[-1]
self.migrate(current_model, vm_host['vm'],
self.migrate(vm_host['vm'],
vm_host['s_host'], vm_host['host'])
for metric, value in zip(self.metrics, vm_load[:-1]):
@@ -408,4 +403,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
break
if balanced:
break
return self.fill_solution(current_model)
return self.fill_solution()
def post_execute(self):
"""Post-execution phase
This can be used to compute the global efficacy
"""
self.solution.model = self.model

View File

@@ -27,6 +27,7 @@ CONF = cfg.CONF
class CollectorManager(object):
def get_cluster_model_collector(self, osc=None):
""":param osc: an OpenStackClients instance"""
nova = nova_helper.NovaHelper(osc=osc)

View File

@@ -36,10 +36,11 @@ class SolutionFaker(object):
def build():
metrics = fake.FakerMetricsCollector()
current_state_cluster = faker_cluster_state.FakerModelCollector()
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.\
MagicMock(get_statistics=metrics.mock_get_statistics)
return sercon.execute(current_state_cluster.generate_scenario_1())
sercon = strategies.BasicConsolidation(config=mock.Mock())
sercon._model = current_state_cluster.generate_scenario_1()
sercon.ceilometer = mock.MagicMock(
get_statistics=metrics.mock_get_statistics)
return sercon.execute()
class SolutionFakerSingleHyp(object):
@@ -47,12 +48,12 @@ class SolutionFakerSingleHyp(object):
def build():
metrics = fake.FakerMetricsCollector()
current_state_cluster = faker_cluster_state.FakerModelCollector()
sercon = strategies.BasicConsolidation()
sercon.ceilometer = \
mock.MagicMock(get_statistics=metrics.mock_get_statistics)
return sercon.execute(
sercon = strategies.BasicConsolidation(config=mock.Mock())
sercon._model = (
current_state_cluster.generate_scenario_3_with_2_hypervisors())
sercon.ceilometer = mock.MagicMock(
get_statistics=metrics.mock_get_statistics)
return sercon.execute()
class TestActionScheduling(base.DbTestCase):

View File

@@ -13,6 +13,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from watcher.decision_engine.solution.default import DefaultSolution
@@ -20,10 +21,7 @@ from watcher.decision_engine.strategy.context.default import \
DefaultStrategyContext
from watcher.decision_engine.strategy.selection.default import \
DefaultStrategySelector
from watcher.decision_engine.strategy.strategies.dummy_strategy import \
DummyStrategy
from watcher.metrics_engine.cluster_model_collector.manager import \
CollectorManager
from watcher.decision_engine.strategy.strategies import dummy_strategy
from watcher.tests.db import base
from watcher.tests.objects import utils as obj_utils
@@ -32,18 +30,19 @@ class TestStrategyContext(base.DbTestCase):
def setUp(self):
super(TestStrategyContext, self).setUp()
obj_utils.create_test_goal(self.context, id=1, name="DUMMY")
audit_template = obj_utils.create_test_audit_template(
self.context)
audit_template = obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(
self.context, audit_template_id=audit_template.id)
strategy_context = DefaultStrategyContext()
@mock.patch.object(dummy_strategy.DummyStrategy, 'model',
new_callable=mock.PropertyMock)
@mock.patch.object(DefaultStrategySelector, 'select')
@mock.patch.object(CollectorManager, "get_cluster_model_collector",
mock.Mock())
def test_execute_strategy(self, mock_call):
mock_call.return_value = DummyStrategy()
def test_execute_strategy(self, mock_call, m_model):
m_model.return_value = mock.Mock()
mock_call.return_value = dummy_strategy.DummyStrategy(
config=mock.Mock())
solution = self.strategy_context.execute_strategy(
self.audit.uuid, self.context)
self.assertIsInstance(solution, DefaultSolution)

View File

@@ -31,11 +31,30 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestBasicConsolidation(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
fake_cluster = faker_cluster_state.FakerModelCollector()
def setUp(self):
super(TestBasicConsolidation, self).setUp()
# fake metrics
self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.BasicConsolidation, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.BasicConsolidation, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.BasicConsolidation(config=mock.Mock())
def test_cluster_size(self):
size_cluster = len(
@@ -44,134 +63,98 @@ class TestBasicConsolidation(base.BaseTestCase):
self.assertEqual(size_cluster_assert, size_cluster)
def test_basic_consolidation_score_hypervisor(self):
cluster = self.fake_cluster.generate_scenario_1()
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
node_1_score = 0.023333333333333317
self.assertEqual(node_1_score, sercon.calculate_score_node(
cluster.get_hypervisor_from_id("Node_1"),
cluster))
self.assertEqual(node_1_score, self.strategy.calculate_score_node(
model.get_hypervisor_from_id("Node_1")))
node_2_score = 0.26666666666666666
self.assertEqual(node_2_score, sercon.calculate_score_node(
cluster.get_hypervisor_from_id("Node_2"),
cluster))
self.assertEqual(node_2_score, self.strategy.calculate_score_node(
model.get_hypervisor_from_id("Node_2")))
node_0_score = 0.023333333333333317
self.assertEqual(node_0_score, sercon.calculate_score_node(
cluster.get_hypervisor_from_id("Node_0"),
cluster))
self.assertEqual(node_0_score, self.strategy.calculate_score_node(
model.get_hypervisor_from_id("Node_0")))
def test_basic_consolidation_score_vm(self):
cluster = self.fake_cluster.generate_scenario_1()
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
vm_0 = model.get_vm_from_id("VM_0")
vm_0_score = 0.023333333333333317
self.assertEqual(vm_0_score, sercon.calculate_score_vm(vm_0, cluster))
self.assertEqual(vm_0_score, self.strategy.calculate_score_vm(vm_0))
vm_1 = cluster.get_vm_from_id("VM_1")
vm_1 = model.get_vm_from_id("VM_1")
vm_1_score = 0.023333333333333317
self.assertEqual(vm_1_score, sercon.calculate_score_vm(vm_1, cluster))
vm_2 = cluster.get_vm_from_id("VM_2")
self.assertEqual(vm_1_score, self.strategy.calculate_score_vm(vm_1))
vm_2 = model.get_vm_from_id("VM_2")
vm_2_score = 0.033333333333333326
self.assertEqual(vm_2_score, sercon.calculate_score_vm(vm_2, cluster))
vm_6 = cluster.get_vm_from_id("VM_6")
self.assertEqual(vm_2_score, self.strategy.calculate_score_vm(vm_2))
vm_6 = model.get_vm_from_id("VM_6")
vm_6_score = 0.02666666666666669
self.assertEqual(vm_6_score, sercon.calculate_score_vm(vm_6, cluster))
vm_7 = cluster.get_vm_from_id("VM_7")
self.assertEqual(vm_6_score, self.strategy.calculate_score_vm(vm_6))
vm_7 = model.get_vm_from_id("VM_7")
vm_7_score = 0.013333333333333345
self.assertEqual(vm_7_score, sercon.calculate_score_vm(vm_7, cluster))
self.assertEqual(vm_7_score, self.strategy.calculate_score_vm(vm_7))
def test_basic_consolidation_score_vm_disk(self):
cluster = self.fake_cluster.generate_scenario_5_with_vm_disk_0()
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
model = self.fake_cluster.generate_scenario_5_with_vm_disk_0()
self.m_model.return_value = model
vm_0 = model.get_vm_from_id("VM_0")
vm_0_score = 0.023333333333333355
self.assertEqual(vm_0_score, sercon.calculate_score_vm(vm_0, cluster))
self.assertEqual(vm_0_score, self.strategy.calculate_score_vm(vm_0, ))
def test_basic_consolidation_weight(self):
cluster = self.fake_cluster.generate_scenario_1()
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
vm_0 = model.get_vm_from_id("VM_0")
cores = 16
# 80 Go
disk = 80
# mem 8 Go
mem = 8
vm_0_weight_assert = 3.1999999999999997
self.assertEqual(vm_0_weight_assert,
sercon.calculate_weight(cluster, vm_0, cores, disk,
mem))
self.assertEqual(
vm_0_weight_assert,
self.strategy.calculate_weight(vm_0, cores, disk, mem))
def test_calculate_migration_efficacy(self):
sercon = strategies.BasicConsolidation()
sercon.calculate_migration_efficacy()
self.strategy.calculate_migration_efficacy()
def test_exception_model(self):
sercon = strategies.BasicConsolidation()
self.assertRaises(exception.ClusterStateNotDefined, sercon.execute,
None)
self.m_model.return_value = None
self.assertRaises(
exception.ClusterStateNotDefined, self.strategy.execute)
def test_exception_cluster_empty(self):
sercon = strategies.BasicConsolidation()
model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, sercon.execute,
model)
def test_calculate_score_vm_raise_cluster_state_not_found(self):
metrics = faker_metrics_collector.FakerMetricsCollector()
metrics.empty_one_metric("CPU_COMPUTE")
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.assertRaises(exception.ClusterStateNotDefined,
sercon.calculate_score_vm, "VM_1", None)
self.m_model.return_value = model
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_check_migration(self):
sercon = strategies.BasicConsolidation()
fake_cluster = faker_cluster_state.FakerModelCollector()
model = fake_cluster.generate_scenario_3_with_2_hypervisors()
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
self.m_model.return_value = model
all_vms = model.get_all_vms()
all_hyps = model.get_all_hypervisors()
vm0 = all_vms[list(all_vms.keys())[0]]
hyp0 = all_hyps[list(all_hyps.keys())[0]]
sercon.check_migration(model, hyp0, hyp0, vm0)
self.strategy.check_migration(hyp0, hyp0, vm0)
def test_threshold(self):
sercon = strategies.BasicConsolidation()
fake_cluster = faker_cluster_state.FakerModelCollector()
model = fake_cluster.generate_scenario_3_with_2_hypervisors()
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
self.m_model.return_value = model
all_hyps = model.get_all_hypervisors()
hyp0 = all_hyps[list(all_hyps.keys())[0]]
sercon.check_threshold(model, hyp0, 1000, 1000, 1000)
threshold_cores = sercon.get_threshold_cores()
sercon.set_threshold_cores(threshold_cores + 1)
self.assertEqual(threshold_cores + 1, sercon.get_threshold_cores())
def test_number_of(self):
sercon = strategies.BasicConsolidation()
sercon.get_number_of_released_nodes()
sercon.get_number_of_migrations()
self.assertFalse(self.strategy.check_threshold(
hyp0, 1000, 1000, 1000))
def test_basic_consolidation_migration(self):
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
self.m_model.return_value = model
solution = sercon.execute(
self.fake_cluster.generate_scenario_3_with_2_hypervisors())
solution = self.strategy.execute()
actions_counter = collections.Counter(
[action.get('action_type') for action in solution.actions])
@@ -187,27 +170,22 @@ class TestBasicConsolidation(base.BaseTestCase):
# calculate_weight
def test_execute_no_workload(self):
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = (
self.fake_cluster
.generate_scenario_4_with_1_hypervisor_no_vm())
self.m_model.return_value = model
current_state_cluster = faker_cluster_state.FakerModelCollector()
model = current_state_cluster. \
generate_scenario_4_with_1_hypervisor_no_vm()
with mock.patch.object(strategies.BasicConsolidation,
'calculate_weight') \
as mock_score_call:
with mock.patch.object(
strategies.BasicConsolidation, 'calculate_weight'
) as mock_score_call:
mock_score_call.return_value = 0
solution = sercon.execute(model)
self.assertEqual(100, solution.efficacy)
solution = self.strategy.execute()
self.assertEqual(0, solution.efficacy)
def test_check_parameters(self):
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
solution = sercon.execute(
self.fake_cluster.generate_scenario_3_with_2_hypervisors())
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
self.m_model.return_value = model
solution = self.strategy.execute()
loader = default.DefaultActionLoader()
for action in solution.actions:
loaded_action = loader.load(action['action_type'])

View File

@@ -14,7 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from watcher.applier.actions.loading import default
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies import \
@@ -22,18 +25,33 @@ from watcher.tests.decision_engine.strategy.strategies import \
class TestDummyStrategy(base.TestCase):
def setUp(self):
super(TestDummyStrategy, self).setUp()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.DummyStrategy, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
self.m_model.return_value = model_root.ModelRoot()
self.strategy = strategies.DummyStrategy(config=mock.Mock())
self.m_model.return_value = model_root.ModelRoot()
self.strategy = strategies.DummyStrategy(config=mock.Mock())
def test_dummy_strategy(self):
dummy = strategies.DummyStrategy()
fake_cluster = faker_cluster_state.FakerModelCollector()
model = fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = dummy.execute(model)
dummy = strategies.DummyStrategy(config=mock.Mock())
solution = dummy.execute()
self.assertEqual(3, len(solution.actions))
def test_check_parameters(self):
dummy = strategies.DummyStrategy()
fake_cluster = faker_cluster_state.FakerModelCollector()
model = fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = dummy.execute(model)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
self.m_model.return_value = model
solution = self.strategy.execute()
loader = default.DefaultActionLoader()
for action in solution.actions:
loaded_action = loader.load(action['action_type'])

View File

@@ -32,93 +32,95 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestOutletTempControl(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
fake_cluster = faker_cluster_state.FakerModelCollector()
def setUp(self):
super(TestOutletTempControl, self).setUp()
# fake metrics
self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.OutletTempControl, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.OutletTempControl, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.OutletTempControl(config=mock.Mock())
def test_calc_used_res(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl()
self.m_model.return_value = model
hypervisor = model.get_hypervisor_from_id('Node_0')
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cores_used, mem_used, disk_used = strategy.calc_used_res(model,
hypervisor,
cap_cores,
cap_mem,
cap_disk)
cores_used, mem_used, disk_used = self.strategy.calc_used_res(
hypervisor, cap_cores, cap_mem, cap_disk)
self.assertEqual((10, 2, 20), (cores_used, mem_used, disk_used))
def test_group_hosts_by_outlet_temp(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
self.m_model.return_value = model
h1, h2 = self.strategy.group_hosts_by_outlet_temp()
self.assertEqual('Node_1', h1[0]['hv'].uuid)
self.assertEqual('Node_0', h2[0]['hv'].uuid)
def test_choose_vm_to_migrate(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
self.m_model.return_value = model
h1, h2 = self.strategy.group_hosts_by_outlet_temp()
vm_to_mig = self.strategy.choose_vm_to_migrate(h1)
self.assertEqual('Node_1', vm_to_mig[0].uuid)
self.assertEqual('a4cab39b-9828-413a-bf88-f76921bf1517',
vm_to_mig[1].uuid)
def test_filter_dest_servers(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
dest_hosts = strategy.filter_dest_servers(model, h2, vm_to_mig[1])
self.m_model.return_value = model
h1, h2 = self.strategy.group_hosts_by_outlet_temp()
vm_to_mig = self.strategy.choose_vm_to_migrate(h1)
dest_hosts = self.strategy.filter_dest_servers(h2, vm_to_mig[1])
self.assertEqual(1, len(dest_hosts))
self.assertEqual('Node_0', dest_hosts[0]['hv'].uuid)
def test_exception_model(self):
strategy = strategies.OutletTempControl()
self.assertRaises(exception.ClusterStateNotDefined, strategy.execute,
None)
self.m_model.return_value = None
self.assertRaises(
exception.ClusterStateNotDefined, self.strategy.execute)
def test_exception_cluster_empty(self):
strategy = strategies.OutletTempControl()
model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
self.m_model.return_value = model
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_cluster_empty(self):
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
self.m_model.return_value = model
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_no_workload(self):
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_4_with_1_hypervisor_no_vm()
self.m_model.return_value = model
current_state_cluster = faker_cluster_state.FakerModelCollector()
model = current_state_cluster. \
generate_scenario_4_with_1_hypervisor_no_vm()
solution = strategy.execute(model)
solution = self.strategy.execute()
self.assertEqual([], solution.actions)
def test_execute(self):
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = strategy.execute(model)
self.m_model.return_value = model
solution = self.strategy.execute()
actions_counter = collections.Counter(
[action.get('action_type') for action in solution.actions])
@@ -126,11 +128,9 @@ class TestOutletTempControl(base.BaseTestCase):
self.assertEqual(1, num_migrations)
def test_check_parameters(self):
outlet = strategies.OutletTempControl()
outlet.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = outlet.execute(model)
self.m_model.return_value = model
solution = self.strategy.execute()
loader = default.DefaultActionLoader()
for action in solution.actions:
loaded_action = loader.load(action['action_type'])

View File

@@ -17,213 +17,215 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_and_metrics
class TestSmartConsolidation(base.BaseTestCase):
fake_cluster = faker_cluster_and_metrics.FakerModelCollector()
class TestVMWorkloadConsolidation(base.BaseTestCase):
def setUp(self):
super(TestVMWorkloadConsolidation, self).setUp()
# fake cluster
self.fake_cluster = faker_cluster_and_metrics.FakerModelCollector()
p_model = mock.patch.object(
strategies.VMWorkloadConsolidation, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.VMWorkloadConsolidation, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
# fake metrics
self.fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(
self.m_model.return_value)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.VMWorkloadConsolidation(config=mock.Mock())
def test_get_vm_utilization(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
self.fake_metrics.model = model
vm_0 = model.get_vm_from_id("VM_0")
vm_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(vm_util,
strategy.get_vm_utilization(vm_0.uuid, cluster))
self.strategy.get_vm_utilization(vm_0.uuid, model))
def test_get_hypervisor_utilization(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
self.fake_metrics.model = model
node_0 = model.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(node_util,
strategy.get_hypervisor_utilization(node_0, cluster))
self.assertEqual(
node_util,
self.strategy.get_hypervisor_utilization(node_0, model))
def test_get_hypervisor_capacity(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
self.fake_metrics.model = model
node_0 = model.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=40, ram=64, disk=250)
self.assertEqual(node_util,
strategy.get_hypervisor_capacity(node_0, cluster))
self.strategy.get_hypervisor_capacity(node_0, model))
def test_get_relative_hypervisor_utilization(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
hypervisor = model.get_hypervisor_from_id('Node_0')
rhu = strategy.get_relative_hypervisor_utilization(hypervisor, model)
rhu = self.strategy.get_relative_hypervisor_utilization(
hypervisor, model)
expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025}
self.assertEqual(expected_rhu, rhu)
def test_get_relative_cluster_utilization(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
cru = strategy.get_relative_cluster_utilization(model)
self.m_model.return_value = model
self.fake_metrics.model = model
cru = self.strategy.get_relative_cluster_utilization(model)
expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375}
self.assertEqual(expected_cru, cru)
def test_add_migration(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
strategy.add_migration(vm_uuid, h1, h2, model)
self.assertEqual(1, len(strategy.solution.actions))
self.strategy.add_migration(vm_uuid, h1, h2, model)
self.assertEqual(1, len(self.strategy.solution.actions))
expected = {'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': vm_uuid}}
self.assertEqual(expected, strategy.solution.actions[0])
self.assertEqual(expected, self.strategy.solution.actions[0])
def test_is_overloaded(self):
strategy = strategies.VMWorkloadConsolidation()
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
res = self.strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
res = self.strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res)
cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
res = self.strategy.is_overloaded(h1, model, cc)
self.assertEqual(True, res)
def test_vm_fits(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc)
res = self.strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(True, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc)
res = self.strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(False, res)
def test_add_action_activate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_activate_hypervisor(h)
self.strategy.add_action_activate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'up',
'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions)
self.assertEqual(expected, self.strategy.solution.actions)
def test_add_action_deactivate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_deactivate_hypervisor(h)
self.strategy.add_action_deactivate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions)
self.assertEqual(expected, self.strategy.solution.actions)
def test_deactivate_unused_hypervisors(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
strategy.deactivate_unused_hypervisors(model)
self.assertEqual(0, len(strategy.solution.actions))
self.strategy.deactivate_unused_hypervisors(model)
self.assertEqual(0, len(self.strategy.solution.actions))
# Migrate VM to free the hypervisor
strategy.add_migration(vm_uuid, h1, h2, model)
self.strategy.add_migration(vm_uuid, h1, h2, model)
strategy.deactivate_unused_hypervisors(model)
self.strategy.deactivate_unused_hypervisors(model)
expected = {'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}}
self.assertEqual(2, len(strategy.solution.actions))
self.assertEqual(expected, strategy.solution.actions[1])
self.assertEqual(2, len(self.strategy.solution.actions))
self.assertEqual(expected, self.strategy.solution.actions[1])
def test_offload_phase(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
self.strategy.offload_phase(model, cc)
expected = []
self.assertEqual(expected, strategy.solution.actions)
self.assertEqual(expected, self.strategy.solution.actions)
def test_consolidation_phase(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.consolidation_phase(model, cc)
self.strategy.consolidation_phase(model, cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': vm_uuid}}]
self.assertEqual(expected, strategy.solution.actions)
self.assertEqual(expected, self.strategy.solution.actions)
def test_strategy(self):
model = self.fake_cluster.generate_scenario_2()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
strategy.consolidation_phase(model, cc)
strategy.optimize_solution(model)
h2 = strategy.solution.actions[0][
self.strategy.offload_phase(model, cc)
self.strategy.consolidation_phase(model, cc)
self.strategy.optimize_solution(model)
h2 = self.strategy.solution.actions[0][
'input_parameters']['dst_hypervisor']
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2,
@@ -236,18 +238,16 @@ class TestSmartConsolidation(base.BaseTestCase):
'migration_type': 'live',
'resource_id': 'VM_1'}}]
self.assertEqual(expected, strategy.solution.actions)
self.assertEqual(expected, self.strategy.solution.actions)
def test_strategy2(self):
model = self.fake_cluster.generate_scenario_3()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
self.m_model.return_value = model
self.fake_metrics.model = model
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
self.strategy.offload_phase(model, cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live',
@@ -263,15 +263,15 @@ class TestSmartConsolidation(base.BaseTestCase):
'migration_type': 'live',
'resource_id': 'VM_8',
'src_hypervisor': h1.uuid}}]
self.assertEqual(expected, strategy.solution.actions)
strategy.consolidation_phase(model, cc)
self.assertEqual(expected, self.strategy.solution.actions)
self.strategy.consolidation_phase(model, cc)
expected.append({'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': 'VM_7',
'src_hypervisor': h2.uuid}})
self.assertEqual(expected, strategy.solution.actions)
strategy.optimize_solution(model)
self.assertEqual(expected, self.strategy.solution.actions)
self.strategy.optimize_solution(model)
del expected[3]
del expected[1]
self.assertEqual(expected, strategy.solution.actions)
self.assertEqual(expected, self.strategy.solution.actions)

View File

@@ -23,7 +23,7 @@ from watcher.applier.actions.loading import default
from watcher.common import exception
from watcher.decision_engine.model import model_root
from watcher.decision_engine.model import resource
from watcher.decision_engine.strategy.strategies import workload_balance
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_state
@@ -32,31 +32,51 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestWorkloadBalance(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
fake_cluster = faker_cluster_state.FakerModelCollector()
def setUp(self):
super(TestWorkloadBalance, self).setUp()
# fake metrics
self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.WorkloadBalance, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.BasicConsolidation, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.WorkloadBalance(config=mock.Mock())
def test_calc_used_res(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
self.m_model.return_value = model
hypervisor = model.get_hypervisor_from_id('Node_0')
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cores_used, mem_used, disk_used = strategy.calculate_used_resource(
model, hypervisor, cap_cores, cap_mem, cap_disk)
cores_used, mem_used, disk_used = (
self.strategy.calculate_used_resource(
hypervisor, cap_cores, cap_mem, cap_disk))
self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40))
def test_group_hosts_by_cpu_util(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.threshold = 30
strategy.ceilometer = mock.MagicMock(
self.m_model.return_value = model
self.strategy.threshold = 30
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
# print h1, h2, avg, w_map
self.assertEqual(h1[0]['hv'].uuid, 'Node_0')
self.assertEqual(h2[0]['hv'].uuid, 'Node_1')
@@ -64,73 +84,69 @@ class TestWorkloadBalance(base.BaseTestCase):
def test_choose_vm_to_migrate(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map)
h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map)
self.assertEqual(vm_to_mig[0].uuid, 'Node_0')
self.assertEqual(vm_to_mig[1].uuid,
"73b09e16-35b7-4922-804e-e8f5d9b740fc")
def test_choose_vm_notfound(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
vms = model.get_all_vms()
vms.clear()
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map)
vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map)
self.assertEqual(vm_to_mig, None)
def test_filter_destination_hosts(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map)
dest_hosts = strategy.filter_destination_hosts(model, h2, vm_to_mig[1],
avg, w_map)
h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map)
dest_hosts = self.strategy.filter_destination_hosts(
h2, vm_to_mig[1], avg, w_map)
self.assertEqual(len(dest_hosts), 1)
self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_1')
def test_exception_model(self):
strategy = workload_balance.WorkloadBalance()
self.assertRaises(exception.ClusterStateNotDefined, strategy.execute,
None)
self.m_model.return_value = None
self.assertRaises(
exception.ClusterStateNotDefined, self.strategy.execute)
def test_exception_cluster_empty(self):
strategy = workload_balance.WorkloadBalance()
model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
self.m_model.return_value = model
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_cluster_empty(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_no_workload(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
model = self.fake_cluster.generate_scenario_4_with_1_hypervisor_no_vm()
self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
current_state_cluster = faker_cluster_state.FakerModelCollector()
model = current_state_cluster. \
generate_scenario_4_with_1_hypervisor_no_vm()
solution = strategy.execute(model)
solution = self.strategy.execute()
self.assertEqual(solution.actions, [])
def test_execute(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
solution = strategy.execute(model)
self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
solution = self.strategy.execute()
actions_counter = collections.Counter(
[action.get('action_type') for action in solution.actions])
@@ -138,11 +154,11 @@ class TestWorkloadBalance(base.BaseTestCase):
self.assertEqual(num_migrations, 1)
def test_check_parameters(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
solution = strategy.execute(model)
self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
solution = self.strategy.execute()
loader = default.DefaultActionLoader()
for action in solution.actions:
loaded_action = loader.load(action['action_type'])

View File

@@ -19,6 +19,7 @@
import mock
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \
@@ -28,40 +29,48 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestWorkloadStabilization(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
fake_cluster = faker_cluster_state.FakerModelCollector()
def setUp(self):
super(TestWorkloadStabilization, self).setUp()
hosts_load_assert = {'Node_0':
{'cpu_util': 0.07, 'memory.resident': 7.0,
'vcpus': 40},
'Node_1':
{'cpu_util': 0.05, 'memory.resident': 5,
'vcpus': 40},
'Node_2':
{'cpu_util': 0.1, 'memory.resident': 29,
'vcpus': 40},
'Node_3':
{'cpu_util': 0.04, 'memory.resident': 8,
'vcpus': 40},
'Node_4':
{'cpu_util': 0.02, 'memory.resident': 4,
'vcpus': 40}}
# fake metrics
self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
self.hosts_load_assert = {
'Node_0': {'cpu_util': 0.07, 'memory.resident': 7.0, 'vcpus': 40},
'Node_1': {'cpu_util': 0.05, 'memory.resident': 5, 'vcpus': 40},
'Node_2': {'cpu_util': 0.1, 'memory.resident': 29, 'vcpus': 40},
'Node_3': {'cpu_util': 0.04, 'memory.resident': 8, 'vcpus': 40},
'Node_4': {'cpu_util': 0.02, 'memory.resident': 4, 'vcpus': 40}}
p_model = mock.patch.object(
strategies.WorkloadStabilization, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.WorkloadStabilization, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.WorkloadStabilization(config=mock.Mock())
def test_get_vm_load(self):
model = self.fake_cluster.generate_scenario_1()
sd = strategies.WorkloadStabilization()
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
vm_0_dict = {'uuid': 'VM_0', 'vcpus': 10,
'cpu_util': 7, 'memory.resident': 2}
self.assertEqual(sd.get_vm_load("VM_0", model), vm_0_dict)
self.assertEqual(vm_0_dict, self.strategy.get_vm_load("VM_0"))
def test_normalize_hosts_load(self):
model = self.fake_cluster.generate_scenario_1()
sd = strategies.WorkloadStabilization()
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
fake_hosts = {'Node_0': {'cpu_util': 0.07, 'memory.resident': 7},
'Node_1': {'cpu_util': 0.05, 'memory.resident': 5}}
normalized_hosts = {'Node_0':
@@ -70,99 +79,82 @@ class TestWorkloadStabilization(base.BaseTestCase):
'Node_1':
{'cpu_util': 0.05,
'memory.resident': 0.03787878787878788}}
self.assertEqual(sd.normalize_hosts_load(fake_hosts, model),
normalized_hosts)
self.assertEqual(
normalized_hosts,
self.strategy.normalize_hosts_load(fake_hosts))
def test_get_hosts_load(self):
sd = strategies.WorkloadStabilization()
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
self.assertEqual(
sd.get_hosts_load(self.fake_cluster.generate_scenario_1()),
self.strategy.get_hosts_load(),
self.hosts_load_assert)
def test_get_sd(self):
sd = strategies.WorkloadStabilization()
test_cpu_sd = 0.027
test_ram_sd = 9.3
self.assertEqual(
round(sd.get_sd(self.hosts_load_assert, 'cpu_util'), 3),
round(self.strategy.get_sd(
self.hosts_load_assert, 'cpu_util'), 3),
test_cpu_sd)
self.assertEqual(
round(sd.get_sd(self.hosts_load_assert, 'memory.resident'), 1),
round(self.strategy.get_sd(
self.hosts_load_assert, 'memory.resident'), 1),
test_ram_sd)
def test_calculate_weighted_sd(self):
sd = strategies.WorkloadStabilization()
sd_case = [0.5, 0.75]
self.assertEqual(sd.calculate_weighted_sd(sd_case), 1.25)
self.assertEqual(self.strategy.calculate_weighted_sd(sd_case), 1.25)
def test_calculate_migration_case(self):
model = self.fake_cluster.generate_scenario_1()
sd = strategies.WorkloadStabilization()
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.assertEqual(sd.calculate_migration_case(
self.hosts_load_assert, "VM_5", "Node_2", "Node_1",
model)[-1]["Node_1"],
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
self.assertEqual(
self.strategy.calculate_migration_case(
self.hosts_load_assert, "VM_5",
"Node_2", "Node_1")[-1]["Node_1"],
{'cpu_util': 2.55, 'memory.resident': 21, 'vcpus': 40})
def test_simulate_migrations(self):
sd = strategies.WorkloadStabilization()
sd.host_choice = 'retry'
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
self.strategy.host_choice = 'retry'
self.assertEqual(
len(sd.simulate_migrations(self.fake_cluster.generate_scenario_1(),
self.hosts_load_assert)),
8)
8,
len(self.strategy.simulate_migrations(self.hosts_load_assert)))
def test_check_threshold(self):
sd = strategies.WorkloadStabilization()
sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
sd.simulate_migrations = mock.Mock(return_value=True)
self.assertTrue(
sd.check_threshold(self.fake_cluster.generate_scenario_1()))
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
self.strategy.simulate_migrations = mock.Mock(return_value=True)
self.assertTrue(self.strategy.check_threshold())
def test_execute_one_migration(self):
sd = strategies.WorkloadStabilization()
model = self.fake_cluster.generate_scenario_1()
sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4',
's_host': 'Node_2',
'host': 'Node_1'}])
with mock.patch.object(sd, 'migrate') as mock_migration:
sd.execute(model)
mock_migration.assert_called_once_with(model, 'VM_4', 'Node_2',
'Node_1')
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
self.strategy.simulate_migrations = mock.Mock(
return_value=[{'vm': 'VM_4', 's_host': 'Node_2', 'host': 'Node_1'}]
)
with mock.patch.object(self.strategy, 'migrate') as mock_migration:
self.strategy.execute()
mock_migration.assert_called_once_with(
'VM_4', 'Node_2', 'Node_1')
def test_execute_multiply_migrations(self):
sd = strategies.WorkloadStabilization()
model = self.fake_cluster.generate_scenario_1()
sd.thresholds = {'cpu_util': 0.00001, 'memory.resident': 0.0001}
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4',
's_host': 'Node_2',
'host': 'Node_1'},
{'vm': 'VM_3',
's_host': 'Node_2',
'host': 'Node_4'}])
with mock.patch.object(sd, 'migrate') as mock_migrate:
sd.execute(model)
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
self.strategy.thresholds = {'cpu_util': 0.00001,
'memory.resident': 0.0001}
self.strategy.simulate_migrations = mock.Mock(
return_value=[{'vm': 'VM_4', 's_host': 'Node_2', 'host': 'Node_1'},
{'vm': 'VM_3', 's_host': 'Node_2', 'host': 'Node_3'}]
)
with mock.patch.object(self.strategy, 'migrate') as mock_migrate:
self.strategy.execute()
self.assertEqual(mock_migrate.call_count, 1)
def test_execute_nothing_to_migrate(self):
sd = strategies.WorkloadStabilization()
model = self.fake_cluster.generate_scenario_1()
sd.thresholds = {'cpu_util': 0.042, 'memory.resident': 0.0001}
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
sd.simulate_migrations = mock.Mock(return_value=False)
with mock.patch.object(sd, 'migrate') as mock_migrate:
sd.execute(model)
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
self.strategy.thresholds = {'cpu_util': 0.042,
'memory.resident': 0.0001}
self.strategy.simulate_migrations = mock.Mock(return_value=False)
with mock.patch.object(self.strategy, 'migrate') as mock_migrate:
self.strategy.execute()
mock_migrate.assert_not_called()