Added pre/post execution methods to strategies

In this changeset, I broke down the execute() method to sequentially
call 3 methods:

- pre_execute()
- do_execute()
- post_execute()

This changeset also removes the cluster model parameter from the
execute() method to now become a `model` property of a strategy which
is lazy loaded whenever needed.

Partially Implements: blueprint efficacy-indicator

Change-Id: I2f697938db693acfa95b2c2fbecfdc1b733c93fd
This commit is contained in:
Vincent Françoise
2016-06-01 16:42:53 +02:00
parent 84b12f8f1e
commit 2b95a4cbc4
18 changed files with 764 additions and 756 deletions

View File

@@ -18,7 +18,6 @@ from oslo_log import log
from watcher.common import clients from watcher.common import clients
from watcher.decision_engine.strategy.context import base from watcher.decision_engine.strategy.context import base
from watcher.decision_engine.strategy.selection import default from watcher.decision_engine.strategy.selection import default
from watcher.metrics_engine.cluster_model_collector import manager
from watcher import objects from watcher import objects
@@ -30,11 +29,6 @@ class DefaultStrategyContext(base.BaseStrategyContext):
def __init__(self): def __init__(self):
super(DefaultStrategyContext, self).__init__() super(DefaultStrategyContext, self).__init__()
LOG.debug("Initializing Strategy Context") LOG.debug("Initializing Strategy Context")
self._collector_manager = manager.CollectorManager()
@property
def collector(self):
return self._collector_manager
def execute_strategy(self, audit_uuid, request_context): def execute_strategy(self, audit_uuid, request_context):
audit = objects.Audit.get_by_uuid(request_context, audit_uuid) audit = objects.Audit.get_by_uuid(request_context, audit_uuid)
@@ -46,10 +40,6 @@ class DefaultStrategyContext(base.BaseStrategyContext):
osc = clients.OpenStackClients() osc = clients.OpenStackClients()
# todo(jed) retrieve in audit_template parameters (threshold,...) # todo(jed) retrieve in audit_template parameters (threshold,...)
# todo(jed) create ActionPlan # todo(jed) create ActionPlan
collector_manager = self.collector.get_cluster_model_collector(osc=osc)
# todo(jed) remove call to get_latest_cluster_data_model
cluster_data_model = collector_manager.get_latest_cluster_data_model()
strategy_selector = default.DefaultStrategySelector( strategy_selector = default.DefaultStrategySelector(
goal_name=objects.Goal.get_by_id( goal_name=objects.Goal.get_by_id(
@@ -59,5 +49,4 @@ class DefaultStrategyContext(base.BaseStrategyContext):
selected_strategy = strategy_selector.select() selected_strategy = strategy_selector.select()
# todo(jed) add parameters and remove cluster_data_model return selected_strategy.execute()
return selected_strategy.execute(cluster_data_model)

View File

@@ -20,14 +20,16 @@ from watcher.decision_engine.strategy.strategies import dummy_strategy
from watcher.decision_engine.strategy.strategies import outlet_temp_control from watcher.decision_engine.strategy.strategies import outlet_temp_control
from watcher.decision_engine.strategy.strategies import \ from watcher.decision_engine.strategy.strategies import \
vm_workload_consolidation vm_workload_consolidation
from watcher.decision_engine.strategy.strategies import workload_balance
from watcher.decision_engine.strategy.strategies import workload_stabilization from watcher.decision_engine.strategy.strategies import workload_stabilization
BasicConsolidation = basic_consolidation.BasicConsolidation BasicConsolidation = basic_consolidation.BasicConsolidation
OutletTempControl = outlet_temp_control.OutletTempControl OutletTempControl = outlet_temp_control.OutletTempControl
DummyStrategy = dummy_strategy.DummyStrategy DummyStrategy = dummy_strategy.DummyStrategy
VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation
WorkloadBalance = workload_balance.WorkloadBalance
WorkloadStabilization = workload_stabilization.WorkloadStabilization WorkloadStabilization = workload_stabilization.WorkloadStabilization
__all__ = ("BasicConsolidation", "OutletTempControl", __all__ = ("BasicConsolidation", "OutletTempControl", "DummyStrategy",
"DummyStrategy", "VMWorkloadConsolidation", "VMWorkloadConsolidation", "WorkloadBalance",
"WorkloadStabilization") "WorkloadStabilization")

View File

@@ -39,12 +39,12 @@ which are dynamically loaded by Watcher at launch time.
import abc import abc
import six import six
from watcher._i18n import _
from watcher.common import clients from watcher.common import clients
from watcher.common.loader import loadable from watcher.common.loader import loadable
from watcher.decision_engine.loading import default as loading from watcher.decision_engine.loading import default as loading
from watcher.decision_engine.solution import default from watcher.decision_engine.solution import default
from watcher.decision_engine.strategy.common import level from watcher.decision_engine.strategy.common import level
from watcher.metrics_engine.cluster_model_collector import manager
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
@@ -56,7 +56,13 @@ class BaseStrategy(loadable.Loadable):
""" """
def __init__(self, config, osc=None): def __init__(self, config, osc=None):
""":param osc: an OpenStackClients instance""" """Constructor: the signature should be identical within the subclasses
:param config: Configuration related to this plugin
:type config: :py:class:`~.Struct`
:param osc: An OpenStackClients instance
:type osc: :py:class:`~.OpenStackClients` instance
"""
super(BaseStrategy, self).__init__(config) super(BaseStrategy, self).__init__(config)
self._name = self.get_name() self._name = self.get_name()
self._display_name = self.get_display_name() self._display_name = self.get_display_name()
@@ -66,6 +72,9 @@ class BaseStrategy(loadable.Loadable):
# the solution given by the strategy # the solution given by the strategy
self._solution = default.DefaultSolution() self._solution = default.DefaultSolution()
self._osc = osc self._osc = osc
self._collector_manager = None
self._model = None
self._goal = None
@classmethod @classmethod
@abc.abstractmethod @abc.abstractmethod
@@ -109,14 +118,60 @@ class BaseStrategy(loadable.Loadable):
return [] return []
@abc.abstractmethod @abc.abstractmethod
def execute(self, original_model): def pre_execute(self):
"""Pre-execution phase
This can be used to fetch some pre-requisites or data.
"""
raise NotImplementedError()
@abc.abstractmethod
def do_execute(self):
"""Strategy execution phase
This phase is where you should put the main logic of your strategy.
"""
raise NotImplementedError()
@abc.abstractmethod
def post_execute(self):
"""Post-execution phase
This can be used to compute the global efficacy
"""
raise NotImplementedError()
def execute(self):
"""Execute a strategy """Execute a strategy
:param original_model: The model the strategy is executed on
:type model: str
:return: A computed solution (via a placement algorithm) :return: A computed solution (via a placement algorithm)
:rtype: :class:`watcher.decision_engine.solution.base.BaseSolution` :rtype: :py:class:`~.BaseSolution` instance
""" """
self.pre_execute()
self.do_execute()
self.post_execute()
return self.solution
@property
def collector(self):
if self._collector_manager is None:
self._collector_manager = manager.CollectorManager()
return self._collector_manager
@property
def model(self):
"""Cluster data model
:returns: Cluster data model the strategy is executed on
:rtype model: :py:class:`~.ModelRoot` instance
"""
if self._model is None:
collector = self.collector.get_cluster_model_collector(
osc=self.osc)
self._model = collector.get_latest_cluster_data_model()
return self._model
@property @property
def osc(self): def osc(self):
@@ -140,6 +195,10 @@ class BaseStrategy(loadable.Loadable):
def display_name(self): def display_name(self):
return self._display_name return self._display_name
@property
def goal(self):
return self._goal
@property @property
def strategy_level(self): def strategy_level(self):
return self._strategy_level return self._strategy_level
@@ -202,11 +261,3 @@ class WorkloadStabilizationBaseStrategy(BaseStrategy):
@classmethod @classmethod
def get_goal_name(cls): def get_goal_name(cls):
return "workload_balancing" return "workload_balancing"
@classmethod
def get_goal_display_name(cls):
return _("Workload balancing")
@classmethod
def get_translatable_goal_display_name(cls):
return "Workload balancing"

View File

@@ -71,11 +71,11 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
MIGRATION = "migrate" MIGRATION = "migrate"
CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state" CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state"
def __init__(self, config=None, osc=None): def __init__(self, config, osc=None):
"""Basic offline Consolidation using live migration """Basic offline Consolidation using live migration
:param config: A mapping containing the configuration of this strategy :param config: A mapping containing the configuration of this strategy
:type config: dict :type config: :py:class:`~.Struct` instance
:param osc: :py:class:`~.OpenStackClients` instance :param osc: :py:class:`~.OpenStackClients` instance
""" """
super(BasicConsolidation, self).__init__(config, osc) super(BasicConsolidation, self).__init__(config, osc)
@@ -134,17 +134,14 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
def compute_attempts(self, size_cluster): def compute_attempts(self, size_cluster):
"""Upper bound of the number of migration """Upper bound of the number of migration
:param size_cluster: :param size_cluster: The size of the cluster
""" """
self.migration_attempts = size_cluster * self.bound_migration self.migration_attempts = size_cluster * self.bound_migration
def check_migration(self, cluster_data_model, def check_migration(self, src_hypervisor, dest_hypervisor, vm_to_mig):
src_hypervisor, """Check if the migration is possible
dest_hypervisor,
vm_to_mig):
"""check if the migration is possible
:param cluster_data_model: the current state of the cluster :param self.model: the current state of the cluster
:param src_hypervisor: the current node of the virtual machine :param src_hypervisor: the current node of the virtual machine
:param dest_hypervisor: the destination of the virtual machine :param dest_hypervisor: the destination of the virtual machine
:param vm_to_mig: the virtual machine :param vm_to_mig: the virtual machine
@@ -153,24 +150,22 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
if src_hypervisor == dest_hypervisor: if src_hypervisor == dest_hypervisor:
return False return False
LOG.debug('Migrate VM {0} from {1} to {2} '.format(vm_to_mig, LOG.debug('Migrate VM %s from %s to %s',
src_hypervisor, vm_to_mig, src_hypervisor, dest_hypervisor)
dest_hypervisor,
))
total_cores = 0 total_cores = 0
total_disk = 0 total_disk = 0
total_mem = 0 total_mem = 0
cpu_capacity = cluster_data_model.get_resource_from_id( cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores) resource.ResourceType.cpu_cores)
disk_capacity = cluster_data_model.get_resource_from_id( disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk) resource.ResourceType.disk)
memory_capacity = cluster_data_model.get_resource_from_id( memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory) resource.ResourceType.memory)
for vm_id in cluster_data_model. \ for vm_id in self.model. \
get_mapping().get_node_vms(dest_hypervisor): get_mapping().get_node_vms(dest_hypervisor):
vm = cluster_data_model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
total_cores += cpu_capacity.get_capacity(vm) total_cores += cpu_capacity.get_capacity(vm)
total_disk += disk_capacity.get_capacity(vm) total_disk += disk_capacity.get_capacity(vm)
total_mem += memory_capacity.get_capacity(vm) total_mem += memory_capacity.get_capacity(vm)
@@ -180,42 +175,33 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
total_disk += disk_capacity.get_capacity(vm_to_mig) total_disk += disk_capacity.get_capacity(vm_to_mig)
total_mem += memory_capacity.get_capacity(vm_to_mig) total_mem += memory_capacity.get_capacity(vm_to_mig)
return self.check_threshold(cluster_data_model, return self.check_threshold(dest_hypervisor, total_cores, total_disk,
dest_hypervisor,
total_cores,
total_disk,
total_mem) total_mem)
def check_threshold(self, cluster_data_model, def check_threshold(self, dest_hypervisor, total_cores,
dest_hypervisor, total_disk, total_mem):
total_cores,
total_disk,
total_mem):
"""Check threshold """Check threshold
check the threshold value defined by the ratio of check the threshold value defined by the ratio of
aggregated CPU capacity of VMs on one node to CPU capacity aggregated CPU capacity of VMs on one node to CPU capacity
of this node must not exceed the threshold value. of this node must not exceed the threshold value.
:param cluster_data_model: the current state of the cluster :param self.model: the current state of the cluster
:param dest_hypervisor: the destination of the virtual machine :param dest_hypervisor: the destination of the virtual machine
:param total_cores :param total_cores
:param total_disk :param total_disk
:param total_mem :param total_mem
:return: True if the threshold is not exceed :return: True if the threshold is not exceed
""" """
cpu_capacity = cluster_data_model.get_resource_from_id( cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor) resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor)
disk_capacity = cluster_data_model.get_resource_from_id( disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(dest_hypervisor) resource.ResourceType.disk).get_capacity(dest_hypervisor)
memory_capacity = cluster_data_model.get_resource_from_id( memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(dest_hypervisor) resource.ResourceType.memory).get_capacity(dest_hypervisor)
if (cpu_capacity >= total_cores * self.threshold_cores and return (cpu_capacity >= total_cores * self.threshold_cores and
disk_capacity >= total_disk * self.threshold_disk and disk_capacity >= total_disk * self.threshold_disk and
memory_capacity >= total_mem * self.threshold_mem): memory_capacity >= total_mem * self.threshold_mem)
return True
else:
return False
def get_allowed_migration_attempts(self): def get_allowed_migration_attempts(self):
"""Allowed migration """Allowed migration
@@ -226,37 +212,24 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
""" """
return self.migration_attempts return self.migration_attempts
def get_threshold_cores(self): def calculate_weight(self, element, total_cores_used, total_disk_used,
return self.threshold_cores
def set_threshold_cores(self, threshold):
self.threshold_cores = threshold
def get_number_of_released_nodes(self):
return self.number_of_released_nodes
def get_number_of_migrations(self):
return self.number_of_migrations
def calculate_weight(self, cluster_data_model, element,
total_cores_used, total_disk_used,
total_memory_used): total_memory_used):
"""Calculate weight of every resource """Calculate weight of every resource
:param cluster_data_model: :param self.model:
:param element: :param element:
:param total_cores_used: :param total_cores_used:
:param total_disk_used: :param total_disk_used:
:param total_memory_used: :param total_memory_used:
:return: :return:
""" """
cpu_capacity = cluster_data_model.get_resource_from_id( cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(element) resource.ResourceType.cpu_cores).get_capacity(element)
disk_capacity = cluster_data_model.get_resource_from_id( disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(element) resource.ResourceType.disk).get_capacity(element)
memory_capacity = cluster_data_model.get_resource_from_id( memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(element) resource.ResourceType.memory).get_capacity(element)
score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) / score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
@@ -275,20 +248,19 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
# todo(jed) take in account weight # todo(jed) take in account weight
return (score_cores + score_disk + score_memory) / 3 return (score_cores + score_disk + score_memory) / 3
def calculate_score_node(self, hypervisor, model): def calculate_score_node(self, hypervisor):
"""calculate the score that represent the utilization level """Calculate the score that represent the utilization level
:param hypervisor: :param hypervisor:
:param model: :return:
:return: """
"""
resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname) resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname)
host_avg_cpu_util = self.ceilometer. \ host_avg_cpu_util = self.ceilometer. \
statistic_aggregation(resource_id=resource_id, statistic_aggregation(resource_id=resource_id,
meter_name=self.HOST_CPU_USAGE_METRIC_NAME, meter_name=self.HOST_CPU_USAGE_METRIC_NAME,
period="7200", period="7200",
aggregate='avg' aggregate='avg')
)
if host_avg_cpu_util is None: if host_avg_cpu_util is None:
LOG.error( LOG.error(
_LE("No values returned by %(resource_id)s " _LE("No values returned by %(resource_id)s "
@@ -298,14 +270,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
) )
host_avg_cpu_util = 100 host_avg_cpu_util = 100
cpu_capacity = model.get_resource_from_id( cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(hypervisor) resource.ResourceType.cpu_cores).get_capacity(hypervisor)
total_cores_used = cpu_capacity * (host_avg_cpu_util / 100) total_cores_used = cpu_capacity * (host_avg_cpu_util / 100)
return self.calculate_weight(model, hypervisor, total_cores_used, return self.calculate_weight(hypervisor, total_cores_used, 0, 0)
0,
0)
def calculate_migration_efficacy(self): def calculate_migration_efficacy(self):
"""Calculate migration efficacy """Calculate migration efficacy
@@ -319,16 +289,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
else: else:
return 0 return 0
def calculate_score_vm(self, vm, cluster_data_model): def calculate_score_vm(self, vm):
"""Calculate Score of virtual machine """Calculate Score of virtual machine
:param vm: the virtual machine :param vm: the virtual machine
:param cluster_data_model: the cluster model :param self.model: the cluster model
:return: score :return: score
""" """
if cluster_data_model is None:
raise exception.ClusterStateNotDefined()
vm_cpu_utilization = self.ceilometer. \ vm_cpu_utilization = self.ceilometer. \
statistic_aggregation( statistic_aggregation(
resource_id=vm.uuid, resource_id=vm.uuid,
@@ -345,13 +312,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
) )
vm_cpu_utilization = 100 vm_cpu_utilization = 100
cpu_capacity = cluster_data_model.get_resource_from_id( cpu_capacity = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(vm) resource.ResourceType.cpu_cores).get_capacity(vm)
total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0) total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0)
return self.calculate_weight(cluster_data_model, vm, return self.calculate_weight(vm, total_cores_used, 0, 0)
total_cores_used, 0, 0)
def add_change_service_state(self, resource_id, state): def add_change_service_state(self, resource_id, state):
parameters = {'state': state} parameters = {'state': state}
@@ -371,48 +337,47 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
resource_id=resource_id, resource_id=resource_id,
input_parameters=parameters) input_parameters=parameters)
def score_of_nodes(self, cluster_data_model, score): def score_of_nodes(self, score):
"""Calculate score of nodes based on load by VMs""" """Calculate score of nodes based on load by VMs"""
for hypervisor_id in cluster_data_model.get_all_hypervisors(): for hypervisor_id in self.model.get_all_hypervisors():
hypervisor = cluster_data_model. \ hypervisor = self.model. \
get_hypervisor_from_id(hypervisor_id) get_hypervisor_from_id(hypervisor_id)
count = cluster_data_model.get_mapping(). \ count = self.model.get_mapping(). \
get_node_vms_from_id(hypervisor_id) get_node_vms_from_id(hypervisor_id)
if len(count) > 0: if len(count) > 0:
result = self.calculate_score_node(hypervisor, result = self.calculate_score_node(hypervisor)
cluster_data_model)
else: else:
''' the hypervisor has not VMs ''' # The hypervisor has not VMs
result = 0 result = 0
if len(count) > 0: if len(count) > 0:
score.append((hypervisor_id, result)) score.append((hypervisor_id, result))
return score return score
def node_and_vm_score(self, sorted_score, score, current_model): def node_and_vm_score(self, sorted_score, score):
"""Get List of VMs from Node""" """Get List of VMs from Node"""
node_to_release = sorted_score[len(score) - 1][0] node_to_release = sorted_score[len(score) - 1][0]
vms_to_mig = current_model.get_mapping().get_node_vms_from_id( vms_to_mig = self.model.get_mapping().get_node_vms_from_id(
node_to_release) node_to_release)
vm_score = [] vm_score = []
for vm_id in vms_to_mig: for vm_id in vms_to_mig:
vm = current_model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
if vm.state == vm_state.VMState.ACTIVE.value: if vm.state == vm_state.VMState.ACTIVE.value:
vm_score.append( vm_score.append(
(vm_id, self.calculate_score_vm(vm, current_model))) (vm_id, self.calculate_score_vm(vm)))
return node_to_release, vm_score return node_to_release, vm_score
def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor, def create_migration_vm(self, mig_vm, mig_src_hypervisor,
mig_dst_hypervisor): mig_dst_hypervisor):
"""Create migration VM """ """Create migration VM"""
if current_model.get_mapping().migrate_vm( if self.model.get_mapping().migrate_vm(
mig_vm, mig_src_hypervisor, mig_dst_hypervisor): mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
self.add_migration(mig_vm.uuid, 'live', self.add_migration(mig_vm.uuid, 'live',
mig_src_hypervisor.uuid, mig_src_hypervisor.uuid,
mig_dst_hypervisor.uuid) mig_dst_hypervisor.uuid)
if len(current_model.get_mapping().get_node_vms( if len(self.model.get_mapping().get_node_vms(
mig_src_hypervisor)) == 0: mig_src_hypervisor)) == 0:
self.add_change_service_state(mig_src_hypervisor. self.add_change_service_state(mig_src_hypervisor.
uuid, uuid,
@@ -420,24 +385,22 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
DISABLED.value) DISABLED.value)
self.number_of_released_nodes += 1 self.number_of_released_nodes += 1
def calculate_num_migrations(self, sorted_vms, current_model, def calculate_num_migrations(self, sorted_vms, node_to_release,
node_to_release, sorted_score): sorted_score):
number_migrations = 0 number_migrations = 0
for vm in sorted_vms: for vm in sorted_vms:
for j in range(0, len(sorted_score)): for j in range(0, len(sorted_score)):
mig_vm = current_model.get_vm_from_id(vm[0]) mig_vm = self.model.get_vm_from_id(vm[0])
mig_src_hypervisor = current_model.get_hypervisor_from_id( mig_src_hypervisor = self.model.get_hypervisor_from_id(
node_to_release) node_to_release)
mig_dst_hypervisor = current_model.get_hypervisor_from_id( mig_dst_hypervisor = self.model.get_hypervisor_from_id(
sorted_score[j][0]) sorted_score[j][0])
result = self.check_migration(current_model, result = self.check_migration(
mig_src_hypervisor, mig_src_hypervisor, mig_dst_hypervisor, mig_vm)
mig_dst_hypervisor, mig_vm)
if result: if result:
self.create_migration_vm( self.create_migration_vm(
current_model, mig_vm, mig_vm, mig_src_hypervisor, mig_dst_hypervisor)
mig_src_hypervisor, mig_dst_hypervisor)
number_migrations += 1 number_migrations += 1
break break
return number_migrations return number_migrations
@@ -450,28 +413,26 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
else: else:
return unsuccessful_migration + 1 return unsuccessful_migration + 1
def execute(self, original_model): def pre_execute(self):
LOG.info(_LI("Initializing Sercon Consolidation")) LOG.info(_LI("Initializing Sercon Consolidation"))
if self.model is None:
if original_model is None:
raise exception.ClusterStateNotDefined() raise exception.ClusterStateNotDefined()
def do_execute(self):
# todo(jed) clone model # todo(jed) clone model
current_model = original_model
self.efficacy = 100 self.efficacy = 100
unsuccessful_migration = 0 unsuccessful_migration = 0
first_migration = True first_migration = True
size_cluster = len(current_model.get_all_hypervisors()) size_cluster = len(self.model.get_all_hypervisors())
if size_cluster == 0: if size_cluster == 0:
raise exception.ClusterEmpty() raise exception.ClusterEmpty()
self.compute_attempts(size_cluster) self.compute_attempts(size_cluster)
for hypervisor_id in current_model.get_all_hypervisors(): for hypervisor_id in self.model.get_all_hypervisors():
hypervisor = current_model.get_hypervisor_from_id(hypervisor_id) hypervisor = self.model.get_hypervisor_from_id(hypervisor_id)
count = current_model.get_mapping(). \ count = self.model.get_mapping(). \
get_node_vms_from_id(hypervisor_id) get_node_vms_from_id(hypervisor_id)
if len(count) == 0: if len(count) == 0:
if hypervisor.state == hyper_state.HypervisorState.ENABLED: if hypervisor.state == hyper_state.HypervisorState.ENABLED:
@@ -487,13 +448,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
first_migration = False first_migration = False
score = [] score = []
score = self.score_of_nodes(current_model, score) score = self.score_of_nodes(score)
''' sort compute nodes by Score decreasing ''''' # Sort compute nodes by Score decreasing
sorted_score = sorted(score, reverse=True, key=lambda x: (x[1])) sorted_score = sorted(score, reverse=True, key=lambda x: (x[1]))
LOG.debug("Hypervisor(s) BFD {0}".format(sorted_score)) LOG.debug("Hypervisor(s) BFD %s", sorted_score)
''' get Node to be released ''' # Get Node to be released
if len(score) == 0: if len(score) == 0:
LOG.warning(_LW( LOG.warning(_LW(
"The workloads of the compute nodes" "The workloads of the compute nodes"
@@ -501,15 +462,15 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
break break
node_to_release, vm_score = self.node_and_vm_score( node_to_release, vm_score = self.node_and_vm_score(
sorted_score, score, current_model) sorted_score, score)
''' sort VMs by Score ''' # Sort VMs by Score
sorted_vms = sorted(vm_score, reverse=True, key=lambda x: (x[1])) sorted_vms = sorted(vm_score, reverse=True, key=lambda x: (x[1]))
# BFD: Best Fit Decrease # BFD: Best Fit Decrease
LOG.debug("VM(s) BFD {0}".format(sorted_vms)) LOG.debug("VM(s) BFD %s", sorted_vms)
migrations = self.calculate_num_migrations( migrations = self.calculate_num_migrations(
sorted_vms, current_model, node_to_release, sorted_score) sorted_vms, node_to_release, sorted_score)
unsuccessful_migration = self.unsuccessful_migration_actualization( unsuccessful_migration = self.unsuccessful_migration_actualization(
migrations, unsuccessful_migration) migrations, unsuccessful_migration)
@@ -519,6 +480,6 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
"efficacy": self.efficacy "efficacy": self.efficacy
} }
LOG.debug(infos) LOG.debug(infos)
self.solution.model = current_model
self.solution.efficacy = self.efficacy def post_execute(self):
return self.solution pass

View File

@@ -19,6 +19,7 @@
from oslo_log import log from oslo_log import log
from watcher._i18n import _ from watcher._i18n import _
from watcher.common import exception
from watcher.decision_engine.strategy.strategies import base from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@@ -48,16 +49,11 @@ class DummyStrategy(base.DummyBaseStrategy):
NOP = "nop" NOP = "nop"
SLEEP = "sleep" SLEEP = "sleep"
def __init__(self, config=None, osc=None): def pre_execute(self):
"""Dummy Strategy implemented for demo and testing purposes if self.model is None:
raise exception.ClusterStateNotDefined()
:param config: A mapping containing the configuration of this strategy def do_execute(self):
:type config: dict
:param osc: :py:class:`~.OpenStackClients` instance
"""
super(DummyStrategy, self).__init__(config, osc)
def execute(self, original_model):
LOG.debug("Executing Dummy strategy") LOG.debug("Executing Dummy strategy")
parameters = {'message': 'hello World'} parameters = {'message': 'hello World'}
self.solution.add_action(action_type=self.NOP, self.solution.add_action(action_type=self.NOP,
@@ -69,7 +65,9 @@ class DummyStrategy(base.DummyBaseStrategy):
self.solution.add_action(action_type=self.SLEEP, self.solution.add_action(action_type=self.SLEEP,
input_parameters={'duration': 5.0}) input_parameters={'duration': 5.0})
return self.solution
def post_execute(self):
pass
@classmethod @classmethod
def get_name(cls): def get_name(cls):

View File

@@ -30,7 +30,7 @@ telemetries to measure thermal/workload status of server.
from oslo_log import log from oslo_log import log
from watcher._i18n import _, _LE from watcher._i18n import _, _LE, _LI
from watcher.common import exception as wexc from watcher.common import exception as wexc
from watcher.decision_engine.model import resource from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state from watcher.decision_engine.model import vm_state
@@ -78,7 +78,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
MIGRATION = "migrate" MIGRATION = "migrate"
def __init__(self, config=None, osc=None): def __init__(self, config, osc=None):
"""Outlet temperature control using live migration """Outlet temperature control using live migration
:param config: A mapping containing the configuration of this strategy :param config: A mapping containing the configuration of this strategy
@@ -116,26 +116,25 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
def ceilometer(self, c): def ceilometer(self, c):
self._ceilometer = c self._ceilometer = c
def calc_used_res(self, cluster_data_model, hypervisor, cpu_capacity, def calc_used_res(self, hypervisor, cpu_capacity,
memory_capacity, disk_capacity): memory_capacity, disk_capacity):
'''calculate the used vcpus, memory and disk based on VM flavors''' """Calculate the used vcpus, memory and disk based on VM flavors"""
vms = cluster_data_model.get_mapping().get_node_vms(hypervisor) vms = self.model.get_mapping().get_node_vms(hypervisor)
vcpus_used = 0 vcpus_used = 0
memory_mb_used = 0 memory_mb_used = 0
disk_gb_used = 0 disk_gb_used = 0
if len(vms) > 0: if len(vms) > 0:
for vm_id in vms: for vm_id in vms:
vm = cluster_data_model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
vcpus_used += cpu_capacity.get_capacity(vm) vcpus_used += cpu_capacity.get_capacity(vm)
memory_mb_used += memory_capacity.get_capacity(vm) memory_mb_used += memory_capacity.get_capacity(vm)
disk_gb_used += disk_capacity.get_capacity(vm) disk_gb_used += disk_capacity.get_capacity(vm)
return vcpus_used, memory_mb_used, disk_gb_used return vcpus_used, memory_mb_used, disk_gb_used
def group_hosts_by_outlet_temp(self, cluster_data_model): def group_hosts_by_outlet_temp(self):
"""Group hosts based on outlet temp meters""" """Group hosts based on outlet temp meters"""
hypervisors = self.model.get_all_hypervisors()
hypervisors = cluster_data_model.get_all_hypervisors()
size_cluster = len(hypervisors) size_cluster = len(hypervisors)
if size_cluster == 0: if size_cluster == 0:
raise wexc.ClusterEmpty() raise wexc.ClusterEmpty()
@@ -143,7 +142,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
hosts_need_release = [] hosts_need_release = []
hosts_target = [] hosts_target = []
for hypervisor_id in hypervisors: for hypervisor_id in hypervisors:
hypervisor = cluster_data_model.get_hypervisor_from_id( hypervisor = self.model.get_hypervisor_from_id(
hypervisor_id) hypervisor_id)
resource_id = hypervisor.uuid resource_id = hypervisor.uuid
@@ -166,37 +165,35 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
hosts_target.append(hvmap) hosts_target.append(hvmap)
return hosts_need_release, hosts_target return hosts_need_release, hosts_target
def choose_vm_to_migrate(self, cluster_data_model, hosts): def choose_vm_to_migrate(self, hosts):
"""pick up an active vm instance to migrate from provided hosts""" """Pick up an active vm instance to migrate from provided hosts"""
for hvmap in hosts: for hvmap in hosts:
mig_src_hypervisor = hvmap['hv'] mig_src_hypervisor = hvmap['hv']
vms_of_src = cluster_data_model.get_mapping().get_node_vms( vms_of_src = self.model.get_mapping().get_node_vms(
mig_src_hypervisor) mig_src_hypervisor)
if len(vms_of_src) > 0: if len(vms_of_src) > 0:
for vm_id in vms_of_src: for vm_id in vms_of_src:
try: try:
# select the first active VM to migrate # select the first active VM to migrate
vm = cluster_data_model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value: if vm.state != vm_state.VMState.ACTIVE.value:
LOG.info(_LE("VM not active, skipped: %s"), LOG.info(_LE("VM not active, skipped: %s"),
vm.uuid) vm.uuid)
continue continue
return mig_src_hypervisor, vm return mig_src_hypervisor, vm
except wexc.InstanceNotFound as e: except wexc.InstanceNotFound as e:
LOG.info("VM not found Error: %s" % e.message) LOG.exception(e)
pass LOG.info(_LI("VM not found"))
return None return None
def filter_dest_servers(self, cluster_data_model, hosts, vm_to_migrate): def filter_dest_servers(self, hosts, vm_to_migrate):
"""Only return hosts with sufficient available resources""" """Only return hosts with sufficient available resources"""
cpu_capacity = self.model.get_resource_from_id(
cpu_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.cpu_cores) resource.ResourceType.cpu_cores)
disk_capacity = cluster_data_model.get_resource_from_id( disk_capacity = self.model.get_resource_from_id(
resource.ResourceType.disk) resource.ResourceType.disk)
memory_capacity = cluster_data_model.get_resource_from_id( memory_capacity = self.model.get_resource_from_id(
resource.ResourceType.memory) resource.ResourceType.memory)
required_cores = cpu_capacity.get_capacity(vm_to_migrate) required_cores = cpu_capacity.get_capacity(vm_to_migrate)
@@ -209,8 +206,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
host = hvmap['hv'] host = hvmap['hv']
# available # available
cores_used, mem_used, disk_used = self.calc_used_res( cores_used, mem_used, disk_used = self.calc_used_res(
cluster_data_model, host, cpu_capacity, memory_capacity, host, cpu_capacity, memory_capacity, disk_capacity)
disk_capacity)
cores_available = cpu_capacity.get_capacity(host) - cores_used cores_available = cpu_capacity.get_capacity(host) - cores_used
disk_available = disk_capacity.get_capacity(host) - disk_used disk_available = disk_capacity.get_capacity(host) - disk_used
mem_available = memory_capacity.get_capacity(host) - mem_used mem_available = memory_capacity.get_capacity(host) - mem_used
@@ -221,15 +217,14 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
return dest_servers return dest_servers
def execute(self, original_model): def pre_execute(self):
LOG.debug("Initializing Outlet temperature strategy") LOG.debug("Initializing Outlet temperature strategy")
if original_model is None: if self.model is None:
raise wexc.ClusterStateNotDefined() raise wexc.ClusterStateNotDefined()
current_model = original_model def do_execute(self):
hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp( hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp()
current_model)
if len(hosts_need_release) == 0: if len(hosts_need_release) == 0:
# TODO(zhenzanz): return something right if there's no hot servers # TODO(zhenzanz): return something right if there's no hot servers
@@ -245,16 +240,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
reverse=True, reverse=True,
key=lambda x: (x["outlet_temp"])) key=lambda x: (x["outlet_temp"]))
vm_to_migrate = self.choose_vm_to_migrate(current_model, vm_to_migrate = self.choose_vm_to_migrate(hosts_need_release)
hosts_need_release)
# calculate the vm's cpu cores,memory,disk needs # calculate the vm's cpu cores,memory,disk needs
if vm_to_migrate is None: if vm_to_migrate is None:
return self.solution return self.solution
mig_src_hypervisor, vm_src = vm_to_migrate mig_src_hypervisor, vm_src = vm_to_migrate
dest_servers = self.filter_dest_servers(current_model, dest_servers = self.filter_dest_servers(hosts_target, vm_src)
hosts_target,
vm_src)
# sort the filtered result by outlet temp # sort the filtered result by outlet temp
# pick up the lowest one as dest server # pick up the lowest one as dest server
if len(dest_servers) == 0: if len(dest_servers) == 0:
@@ -267,9 +259,8 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
# always use the host with lowerest outlet temperature # always use the host with lowerest outlet temperature
mig_dst_hypervisor = dest_servers[0]['hv'] mig_dst_hypervisor = dest_servers[0]['hv']
# generate solution to migrate the vm to the dest server, # generate solution to migrate the vm to the dest server,
if current_model.get_mapping().migrate_vm(vm_src, if self.model.get_mapping().migrate_vm(
mig_src_hypervisor, vm_src, mig_src_hypervisor, mig_dst_hypervisor):
mig_dst_hypervisor):
parameters = {'migration_type': 'live', parameters = {'migration_type': 'live',
'src_hypervisor': mig_src_hypervisor.uuid, 'src_hypervisor': mig_src_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid} 'dst_hypervisor': mig_dst_hypervisor.uuid}
@@ -277,6 +268,6 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
resource_id=vm_src.uuid, resource_id=vm_src.uuid,
input_parameters=parameters) input_parameters=parameters)
self.solution.model = current_model def post_execute(self):
self.solution.model = self.model
return self.solution # TODO(v-francoise): Add the indicators to the solution

View File

@@ -84,7 +84,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
https://github.com/openstack/watcher-specs/blob/master/specs/mitaka/implemented/zhaw-load-consolidation.rst https://github.com/openstack/watcher-specs/blob/master/specs/mitaka/implemented/zhaw-load-consolidation.rst
""" # noqa """ # noqa
def __init__(self, config=None, osc=None): def __init__(self, config, osc=None):
super(VMWorkloadConsolidation, self).__init__(config, osc) super(VMWorkloadConsolidation, self).__init__(config, osc)
self._ceilometer = None self._ceilometer = None
self.number_of_migrations = 0 self.number_of_migrations = 0
@@ -121,8 +121,8 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
""" """
if isinstance(state, six.string_types): if isinstance(state, six.string_types):
return state return state
elif (type(state) == hyper_state.HypervisorState or elif isinstance(state, (vm_state.VMState,
type(state) == vm_state.VMState): hyper_state.HypervisorState)):
return state.value return state.value
else: else:
LOG.error(_LE('Unexpexted resource state type, ' LOG.error(_LE('Unexpexted resource state type, '
@@ -171,12 +171,10 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
vm_state_str = self.get_state_str(vm.state) vm_state_str = self.get_state_str(vm.state)
if vm_state_str != vm_state.VMState.ACTIVE.value: if vm_state_str != vm_state.VMState.ACTIVE.value:
''' # Watcher curently only supports live VM migration and block live
Watcher curently only supports live VM migration and block live # VM migration which both requires migrated VM to be active.
VM migration which both requires migrated VM to be active. # When supported, the cold migration may be used as a fallback
When supported, the cold migration may be used as a fallback # migration mechanism to move non active VMs.
migration mechanism to move non active VMs.
'''
LOG.error(_LE('Cannot live migrate: vm_uuid=%(vm_uuid)s, ' LOG.error(_LE('Cannot live migrate: vm_uuid=%(vm_uuid)s, '
'state=%(vm_state)s.'), 'state=%(vm_state)s.'),
vm_uuid=vm_uuid, vm_uuid=vm_uuid,
@@ -209,13 +207,13 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
if len(model.get_mapping().get_node_vms(hypervisor)) == 0: if len(model.get_mapping().get_node_vms(hypervisor)) == 0:
self.add_action_deactivate_hypervisor(hypervisor) self.add_action_deactivate_hypervisor(hypervisor)
def get_prediction_model(self, model): def get_prediction_model(self):
"""Return a deepcopy of a model representing current cluster state. """Return a deepcopy of a model representing current cluster state.
:param model: model_root object :param model: model_root object
:return: model_root object :return: model_root object
""" """
return deepcopy(model) return deepcopy(self.model)
def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'): def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a VM. """Collect cpu, ram and disk utilization statistics of a VM.
@@ -334,7 +332,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
:param model: model_root object :param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>} :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
""" """
hypervisors = model.get_all_hypervisors().values() hypervisors = self.model.get_all_hypervisors().values()
rcu = {} rcu = {}
counters = {} counters = {}
for hypervisor in hypervisors: for hypervisor in hypervisors:
@@ -452,9 +450,11 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
key=lambda x: self.get_hypervisor_utilization(x, model)['cpu']) key=lambda x: self.get_hypervisor_utilization(x, model)['cpu'])
for hypervisor in reversed(sorted_hypervisors): for hypervisor in reversed(sorted_hypervisors):
if self.is_overloaded(hypervisor, model, cc): if self.is_overloaded(hypervisor, model, cc):
for vm in sorted(model.get_mapping().get_node_vms(hypervisor), for vm in sorted(
key=lambda x: self.get_vm_utilization( model.get_mapping().get_node_vms(hypervisor),
x, model)['cpu']): key=lambda x: self.get_vm_utilization(
x, model)['cpu']
):
for dst_hypervisor in reversed(sorted_hypervisors): for dst_hypervisor in reversed(sorted_hypervisors):
if self.vm_fits(vm, dst_hypervisor, model, cc): if self.vm_fits(vm, dst_hypervisor, model, cc):
self.add_migration(vm, hypervisor, self.add_migration(vm, hypervisor,
@@ -498,7 +498,11 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
dsc -= 1 dsc -= 1
asc += 1 asc += 1
def execute(self, original_model): def pre_execute(self):
if self.model is None:
raise exception.ClusterStateNotDefined()
def do_execute(self):
"""Execute strategy. """Execute strategy.
This strategy produces a solution resulting in more This strategy produces a solution resulting in more
@@ -513,7 +517,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
:param original_model: root_model object :param original_model: root_model object
""" """
LOG.info(_LI('Executing Smart Strategy')) LOG.info(_LI('Executing Smart Strategy'))
model = self.get_prediction_model(original_model) model = self.get_prediction_model(self.model)
rcu = self.get_relative_cluster_utilization(model) rcu = self.get_relative_cluster_utilization(model)
self.ceilometer_vm_data_cache = dict() self.ceilometer_vm_data_cache = dict()
@@ -545,4 +549,6 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
self.solution.model = model self.solution.model = model
self.solution.efficacy = rcu_after['cpu'] self.solution.efficacy = rcu_after['cpu']
return self.solution def post_execute(self):
# TODO(v-francoise): Add the indicators to the solution
pass

View File

@@ -28,7 +28,7 @@ from watcher.metrics_engine.cluster_history import ceilometer as ceil
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class WorkloadBalance(base.BaseStrategy): class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
"""[PoC]Workload balance using live migration """[PoC]Workload balance using live migration
*Description* *Description*
@@ -68,10 +68,12 @@ class WorkloadBalance(base.BaseStrategy):
MIGRATION = "migrate" MIGRATION = "migrate"
def __init__(self, config=None, osc=None): def __init__(self, config, osc=None):
"""Using live migration """Workload balance using live migration
:param osc: an OpenStackClients object :param config: A mapping containing the configuration of this strategy
:type config: :py:class:`~.Struct` instance
:param osc: :py:class:`~.OpenStackClients` instance
""" """
super(WorkloadBalance, self).__init__(config, osc) super(WorkloadBalance, self).__init__(config, osc)
# the migration plan will be triggered when the CPU utlization % # the migration plan will be triggered when the CPU utlization %
@@ -104,44 +106,32 @@ class WorkloadBalance(base.BaseStrategy):
def get_translatable_display_name(cls): def get_translatable_display_name(cls):
return "workload balance migration strategy" return "workload balance migration strategy"
@classmethod def calculate_used_resource(self, hypervisor, cap_cores, cap_mem,
def get_goal_name(cls):
return "WORKLOAD_OPTIMIZATION"
@classmethod
def get_goal_display_name(cls):
return _("Workload optimization")
@classmethod
def get_translatable_goal_display_name(cls):
return "Workload optimization"
def calculate_used_resource(self, model, hypervisor, cap_cores, cap_mem,
cap_disk): cap_disk):
'''calculate the used vcpus, memory and disk based on VM flavors''' """Calculate the used vcpus, memory and disk based on VM flavors"""
vms = model.get_mapping().get_node_vms(hypervisor) vms = self.model.get_mapping().get_node_vms(hypervisor)
vcpus_used = 0 vcpus_used = 0
memory_mb_used = 0 memory_mb_used = 0
disk_gb_used = 0 disk_gb_used = 0
for vm_id in vms: for vm_id in vms:
vm = model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
vcpus_used += cap_cores.get_capacity(vm) vcpus_used += cap_cores.get_capacity(vm)
memory_mb_used += cap_mem.get_capacity(vm) memory_mb_used += cap_mem.get_capacity(vm)
disk_gb_used += cap_disk.get_capacity(vm) disk_gb_used += cap_disk.get_capacity(vm)
return vcpus_used, memory_mb_used, disk_gb_used return vcpus_used, memory_mb_used, disk_gb_used
def choose_vm_to_migrate(self, model, hosts, avg_workload, workload_cache): def choose_vm_to_migrate(self, hosts, avg_workload, workload_cache):
"""pick up an active vm instance to migrate from provided hosts """Pick up an active vm instance to migrate from provided hosts
:param model: it's the origin_model passed from 'execute' function
:param hosts: the array of dict which contains hypervisor object :param hosts: the array of dict which contains hypervisor object
:param avg_workload: the average workload value of all hypervisors :param avg_workload: the average workload value of all hypervisors
:param workload_cache: the map contains vm to workload mapping :param workload_cache: the map contains vm to workload mapping
""" """
for hvmap in hosts: for hvmap in hosts:
source_hypervisor = hvmap['hv'] source_hypervisor = hvmap['hv']
source_vms = model.get_mapping().get_node_vms(source_hypervisor) source_vms = self.model.get_mapping().get_node_vms(
source_hypervisor)
if source_vms: if source_vms:
delta_workload = hvmap['workload'] - avg_workload delta_workload = hvmap['workload'] - avg_workload
min_delta = 1000000 min_delta = 1000000
@@ -149,7 +139,7 @@ class WorkloadBalance(base.BaseStrategy):
for vm_id in source_vms: for vm_id in source_vms:
try: try:
# select the first active VM to migrate # select the first active VM to migrate
vm = model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value: if vm.state != vm_state.VMState.ACTIVE.value:
LOG.debug("VM not active, skipped: %s", LOG.debug("VM not active, skipped: %s",
vm.uuid) vm.uuid)
@@ -161,18 +151,20 @@ class WorkloadBalance(base.BaseStrategy):
except wexc.InstanceNotFound: except wexc.InstanceNotFound:
LOG.error(_LE("VM not found Error: %s"), vm_id) LOG.error(_LE("VM not found Error: %s"), vm_id)
if instance_id: if instance_id:
return source_hypervisor, model.get_vm_from_id(instance_id) return source_hypervisor, self.model.get_vm_from_id(
instance_id)
else: else:
LOG.info(_LI("VM not found from hypervisor: %s"), LOG.info(_LI("VM not found from hypervisor: %s"),
source_hypervisor.uuid) source_hypervisor.uuid)
def filter_destination_hosts(self, model, hosts, vm_to_migrate, def filter_destination_hosts(self, hosts, vm_to_migrate,
avg_workload, workload_cache): avg_workload, workload_cache):
'''Only return hosts with sufficient available resources''' '''Only return hosts with sufficient available resources'''
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) cap_cores = self.model.get_resource_from_id(
cap_disk = model.get_resource_from_id(resource.ResourceType.disk) resource.ResourceType.cpu_cores)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory) cap_disk = self.model.get_resource_from_id(resource.ResourceType.disk)
cap_mem = self.model.get_resource_from_id(resource.ResourceType.memory)
required_cores = cap_cores.get_capacity(vm_to_migrate) required_cores = cap_cores.get_capacity(vm_to_migrate)
required_disk = cap_disk.get_capacity(vm_to_migrate) required_disk = cap_disk.get_capacity(vm_to_migrate)
@@ -186,20 +178,22 @@ class WorkloadBalance(base.BaseStrategy):
workload = hvmap['workload'] workload = hvmap['workload']
# calculate the available resources # calculate the available resources
cores_used, mem_used, disk_used = self.calculate_used_resource( cores_used, mem_used, disk_used = self.calculate_used_resource(
model, host, cap_cores, cap_mem, cap_disk) host, cap_cores, cap_mem, cap_disk)
cores_available = cap_cores.get_capacity(host) - cores_used cores_available = cap_cores.get_capacity(host) - cores_used
disk_available = cap_disk.get_capacity(host) - disk_used disk_available = cap_disk.get_capacity(host) - disk_used
mem_available = cap_mem.get_capacity(host) - mem_used mem_available = cap_mem.get_capacity(host) - mem_used
if (cores_available >= required_cores and if (
disk_available >= required_disk and cores_available >= required_cores and
mem_available >= required_mem and disk_available >= required_disk and
(src_vm_workload + workload) < self.threshold / 100 * mem_available >= required_mem and
cap_cores.get_capacity(host)): (src_vm_workload + workload) < self.threshold / 100 *
cap_cores.get_capacity(host)
):
destination_hosts.append(hvmap) destination_hosts.append(hvmap)
return destination_hosts return destination_hosts
def group_hosts_by_cpu_util(self, model): def group_hosts_by_cpu_util(self):
"""Calculate the workloads of each hypervisor """Calculate the workloads of each hypervisor
try to find out the hypervisors which have reached threshold try to find out the hypervisors which have reached threshold
@@ -208,12 +202,13 @@ class WorkloadBalance(base.BaseStrategy):
and also generate the VM workload map. and also generate the VM workload map.
""" """
hypervisors = model.get_all_hypervisors() hypervisors = self.model.get_all_hypervisors()
cluster_size = len(hypervisors) cluster_size = len(hypervisors)
if not hypervisors: if not hypervisors:
raise wexc.ClusterEmpty() raise wexc.ClusterEmpty()
# get cpu cores capacity of hypervisors and vms # get cpu cores capacity of hypervisors and vms
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) cap_cores = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores)
overload_hosts = [] overload_hosts = []
nonoverload_hosts = [] nonoverload_hosts = []
# total workload of cluster # total workload of cluster
@@ -222,19 +217,20 @@ class WorkloadBalance(base.BaseStrategy):
# use workload_cache to store the workload of VMs for reuse purpose # use workload_cache to store the workload of VMs for reuse purpose
workload_cache = {} workload_cache = {}
for hypervisor_id in hypervisors: for hypervisor_id in hypervisors:
hypervisor = model.get_hypervisor_from_id(hypervisor_id) hypervisor = self.model.get_hypervisor_from_id(hypervisor_id)
vms = model.get_mapping().get_node_vms(hypervisor) vms = self.model.get_mapping().get_node_vms(hypervisor)
hypervisor_workload = 0.0 hypervisor_workload = 0.0
for vm_id in vms: for vm_id in vms:
vm = model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
try: try:
cpu_util = self.ceilometer.statistic_aggregation( cpu_util = self.ceilometer.statistic_aggregation(
resource_id=vm_id, resource_id=vm_id,
meter_name=self._meter, meter_name=self._meter,
period=self._period, period=self._period,
aggregate='avg') aggregate='avg')
except Exception as e: except Exception as exc:
LOG.error(_LE("Can not get cpu_util: %s"), e.message) LOG.exception(exc)
LOG.error(_LE("Can not get cpu_util"))
continue continue
if cpu_util is None: if cpu_util is None:
LOG.debug("%s: cpu_util is None", vm_id) LOG.debug("%s: cpu_util is None", vm_id)
@@ -260,15 +256,23 @@ class WorkloadBalance(base.BaseStrategy):
return overload_hosts, nonoverload_hosts, avg_workload, workload_cache return overload_hosts, nonoverload_hosts, avg_workload, workload_cache
def execute(self, origin_model): def pre_execute(self):
"""Pre-execution phase
This can be used to fetch some pre-requisites or data.
"""
LOG.info(_LI("Initializing Workload Balance Strategy")) LOG.info(_LI("Initializing Workload Balance Strategy"))
if origin_model is None: if self.model is None:
raise wexc.ClusterStateNotDefined() raise wexc.ClusterStateNotDefined()
current_model = origin_model def do_execute(self):
"""Strategy execution phase
This phase is where you should put the main logic of your strategy.
"""
src_hypervisors, target_hypervisors, avg_workload, workload_cache = ( src_hypervisors, target_hypervisors, avg_workload, workload_cache = (
self.group_hosts_by_cpu_util(current_model)) self.group_hosts_by_cpu_util())
if not src_hypervisors: if not src_hypervisors:
LOG.debug("No hosts require optimization") LOG.debug("No hosts require optimization")
@@ -286,19 +290,14 @@ class WorkloadBalance(base.BaseStrategy):
reverse=True, reverse=True,
key=lambda x: (x[self.METER_NAME])) key=lambda x: (x[self.METER_NAME]))
vm_to_migrate = self.choose_vm_to_migrate(current_model, vm_to_migrate = self.choose_vm_to_migrate(
src_hypervisors, src_hypervisors, avg_workload, workload_cache)
avg_workload,
workload_cache)
if not vm_to_migrate: if not vm_to_migrate:
return self.solution return self.solution
source_hypervisor, vm_src = vm_to_migrate source_hypervisor, vm_src = vm_to_migrate
# find the hosts that have enough resource for the VM to be migrated # find the hosts that have enough resource for the VM to be migrated
destination_hosts = self.filter_destination_hosts(current_model, destination_hosts = self.filter_destination_hosts(
target_hypervisors, target_hypervisors, vm_src, avg_workload, workload_cache)
vm_src,
avg_workload,
workload_cache)
# sort the filtered result by workload # sort the filtered result by workload
# pick up the lowest one as dest server # pick up the lowest one as dest server
if not destination_hosts: if not destination_hosts:
@@ -311,14 +310,18 @@ class WorkloadBalance(base.BaseStrategy):
# always use the host with lowerest CPU utilization # always use the host with lowerest CPU utilization
mig_dst_hypervisor = destination_hosts[0]['hv'] mig_dst_hypervisor = destination_hosts[0]['hv']
# generate solution to migrate the vm to the dest server, # generate solution to migrate the vm to the dest server,
if current_model.get_mapping().migrate_vm(vm_src, if self.model.get_mapping().migrate_vm(vm_src, source_hypervisor,
source_hypervisor, mig_dst_hypervisor):
mig_dst_hypervisor):
parameters = {'migration_type': 'live', parameters = {'migration_type': 'live',
'src_hypervisor': source_hypervisor.uuid, 'src_hypervisor': source_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid} 'dst_hypervisor': mig_dst_hypervisor.uuid}
self.solution.add_action(action_type=self.MIGRATION, self.solution.add_action(action_type=self.MIGRATION,
resource_id=vm_src.uuid, resource_id=vm_src.uuid,
input_parameters=parameters) input_parameters=parameters)
self.solution.model = current_model
return self.solution def post_execute(self):
"""Post-execution phase
This can be used to compute the global efficacy
"""
self.solution.model = self.model

View File

@@ -108,7 +108,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
MIGRATION = "migrate" MIGRATION = "migrate"
MEMOIZE = _set_memoize(CONF) MEMOIZE = _set_memoize(CONF)
def __init__(self, config=None, osc=None): def __init__(self, config, osc=None):
super(WorkloadStabilization, self).__init__(config, osc) super(WorkloadStabilization, self).__init__(config, osc)
self._ceilometer = None self._ceilometer = None
self._nova = None self._nova = None
@@ -164,17 +164,16 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
return vm_load['cpu_util'] * (vm_load['vcpus'] / float(host_vcpus)) return vm_load['cpu_util'] * (vm_load['vcpus'] / float(host_vcpus))
@MEMOIZE @MEMOIZE
def get_vm_load(self, vm_uuid, current_model): def get_vm_load(self, vm_uuid):
"""Gathering vm load through ceilometer statistic. """Gathering vm load through ceilometer statistic.
:param vm_uuid: vm for which statistic is gathered. :param vm_uuid: vm for which statistic is gathered.
:param current_model: the cluster model
:return: dict :return: dict
""" """
LOG.debug(_LI('get_vm_load started')) LOG.debug(_LI('get_vm_load started'))
vm_vcpus = current_model.get_resource_from_id( vm_vcpus = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity( resource.ResourceType.cpu_cores).get_capacity(
current_model.get_vm_from_id(vm_uuid)) self.model.get_vm_from_id(vm_uuid))
vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus} vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus}
for meter in self.metrics: for meter in self.metrics:
avg_meter = self.ceilometer.statistic_aggregation( avg_meter = self.ceilometer.statistic_aggregation(
@@ -189,25 +188,25 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
vm_load[meter] = avg_meter vm_load[meter] = avg_meter
return vm_load return vm_load
def normalize_hosts_load(self, hosts, current_model): def normalize_hosts_load(self, hosts):
normalized_hosts = deepcopy(hosts) normalized_hosts = deepcopy(hosts)
for host in normalized_hosts: for host in normalized_hosts:
if 'memory.resident' in normalized_hosts[host]: if 'memory.resident' in normalized_hosts[host]:
h_memory = current_model.get_resource_from_id( h_memory = self.model.get_resource_from_id(
resource.ResourceType.memory).get_capacity( resource.ResourceType.memory).get_capacity(
current_model.get_hypervisor_from_id(host)) self.model.get_hypervisor_from_id(host))
normalized_hosts[host]['memory.resident'] /= float(h_memory) normalized_hosts[host]['memory.resident'] /= float(h_memory)
return normalized_hosts return normalized_hosts
def get_hosts_load(self, current_model): def get_hosts_load(self):
"""Get load of every host by gathering vms load""" """Get load of every host by gathering vms load"""
hosts_load = {} hosts_load = {}
for hypervisor_id in current_model.get_all_hypervisors(): for hypervisor_id in self.model.get_all_hypervisors():
hosts_load[hypervisor_id] = {} hosts_load[hypervisor_id] = {}
host_vcpus = current_model.get_resource_from_id( host_vcpus = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity( resource.ResourceType.cpu_cores).get_capacity(
current_model.get_hypervisor_from_id(hypervisor_id)) self.model.get_hypervisor_from_id(hypervisor_id))
hosts_load[hypervisor_id]['vcpus'] = host_vcpus hosts_load[hypervisor_id]['vcpus'] = host_vcpus
for metric in self.metrics: for metric in self.metrics:
@@ -250,8 +249,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
" for %s in weight dict.") % metric) " for %s in weight dict.") % metric)
return weighted_sd return weighted_sd
def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id, def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id):
current_model):
"""Calculate migration case """Calculate migration case
Return list of standard deviation values, that appearing in case of Return list of standard deviation values, that appearing in case of
@@ -260,12 +258,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
:param vm_id: the virtual machine :param vm_id: the virtual machine
:param src_hp_id: the source hypervisor id :param src_hp_id: the source hypervisor id
:param dst_hp_id: the destination hypervisor id :param dst_hp_id: the destination hypervisor id
:param current_model: the cluster model
:return: list of standard deviation values :return: list of standard deviation values
""" """
migration_case = [] migration_case = []
new_hosts = deepcopy(hosts) new_hosts = deepcopy(hosts)
vm_load = self.get_vm_load(vm_id, current_model) vm_load = self.get_vm_load(vm_id)
d_host_vcpus = new_hosts[dst_hp_id]['vcpus'] d_host_vcpus = new_hosts[dst_hp_id]['vcpus']
s_host_vcpus = new_hosts[src_hp_id]['vcpus'] s_host_vcpus = new_hosts[src_hp_id]['vcpus']
for metric in self.metrics: for metric in self.metrics:
@@ -279,13 +276,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
else: else:
new_hosts[src_hp_id][metric] -= vm_load[metric] new_hosts[src_hp_id][metric] -= vm_load[metric]
new_hosts[dst_hp_id][metric] += vm_load[metric] new_hosts[dst_hp_id][metric] += vm_load[metric]
normalized_hosts = self.normalize_hosts_load(new_hosts, current_model) normalized_hosts = self.normalize_hosts_load(new_hosts)
for metric in self.metrics: for metric in self.metrics:
migration_case.append(self.get_sd(normalized_hosts, metric)) migration_case.append(self.get_sd(normalized_hosts, metric))
migration_case.append(new_hosts) migration_case.append(new_hosts)
return migration_case return migration_case
def simulate_migrations(self, current_model, hosts): def simulate_migrations(self, hosts):
"""Make sorted list of pairs vm:dst_host""" """Make sorted list of pairs vm:dst_host"""
def yield_hypervisors(hypervisors): def yield_hypervisors(hypervisors):
ct = CONF['watcher_strategies.workload_stabilization'].retry_count ct = CONF['watcher_strategies.workload_stabilization'].retry_count
@@ -300,23 +297,22 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
yield hypervisors yield hypervisors
vm_host_map = [] vm_host_map = []
for source_hp_id in current_model.get_all_hypervisors(): for source_hp_id in self.model.get_all_hypervisors():
hypervisors = list(current_model.get_all_hypervisors()) hypervisors = list(self.model.get_all_hypervisors())
hypervisors.remove(source_hp_id) hypervisors.remove(source_hp_id)
hypervisor_list = yield_hypervisors(hypervisors) hypervisor_list = yield_hypervisors(hypervisors)
vms_id = current_model.get_mapping(). \ vms_id = self.model.get_mapping(). \
get_node_vms_from_id(source_hp_id) get_node_vms_from_id(source_hp_id)
for vm_id in vms_id: for vm_id in vms_id:
min_sd_case = {'value': len(self.metrics)} min_sd_case = {'value': len(self.metrics)}
vm = current_model.get_vm_from_id(vm_id) vm = self.model.get_vm_from_id(vm_id)
if vm.state not in [vm_state.VMState.ACTIVE.value, if vm.state not in [vm_state.VMState.ACTIVE.value,
vm_state.VMState.PAUSED.value]: vm_state.VMState.PAUSED.value]:
continue continue
for dst_hp_id in next(hypervisor_list): for dst_hp_id in next(hypervisor_list):
sd_case = self.calculate_migration_case(hosts, vm_id, sd_case = self.calculate_migration_case(hosts, vm_id,
source_hp_id, source_hp_id,
dst_hp_id, dst_hp_id)
current_model)
weighted_sd = self.calculate_weighted_sd(sd_case[:-1]) weighted_sd = self.calculate_weighted_sd(sd_case[:-1])
@@ -327,14 +323,14 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
break break
return sorted(vm_host_map, key=lambda x: x['value']) return sorted(vm_host_map, key=lambda x: x['value'])
def check_threshold(self, current_model): def check_threshold(self):
"""Check if cluster is needed in balancing""" """Check if cluster is needed in balancing"""
hosts_load = self.get_hosts_load(current_model) hosts_load = self.get_hosts_load()
normalized_load = self.normalize_hosts_load(hosts_load, current_model) normalized_load = self.normalize_hosts_load(hosts_load)
for metric in self.metrics: for metric in self.metrics:
metric_sd = self.get_sd(normalized_load, metric) metric_sd = self.get_sd(normalized_load, metric)
if metric_sd > float(self.thresholds[metric]): if metric_sd > float(self.thresholds[metric]):
return self.simulate_migrations(current_model, hosts_load) return self.simulate_migrations(hosts_load)
def add_migration(self, def add_migration(self,
resource_id, resource_id,
@@ -348,58 +344,57 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
resource_id=resource_id, resource_id=resource_id,
input_parameters=parameters) input_parameters=parameters)
def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor, def create_migration_vm(self, mig_vm, mig_src_hypervisor,
mig_dst_hypervisor): mig_dst_hypervisor):
"""Create migration VM """ """Create migration VM """
if current_model.get_mapping().migrate_vm( if self.model.get_mapping().migrate_vm(
mig_vm, mig_src_hypervisor, mig_dst_hypervisor): mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
self.add_migration(mig_vm.uuid, 'live', self.add_migration(mig_vm.uuid, 'live',
mig_src_hypervisor.uuid, mig_src_hypervisor.uuid,
mig_dst_hypervisor.uuid) mig_dst_hypervisor.uuid)
def migrate(self, current_model, vm_uuid, src_host, dst_host): def migrate(self, vm_uuid, src_host, dst_host):
mig_vm = current_model.get_vm_from_id(vm_uuid) mig_vm = self.model.get_vm_from_id(vm_uuid)
mig_src_hypervisor = current_model.get_hypervisor_from_id(src_host) mig_src_hypervisor = self.model.get_hypervisor_from_id(src_host)
mig_dst_hypervisor = current_model.get_hypervisor_from_id(dst_host) mig_dst_hypervisor = self.model.get_hypervisor_from_id(dst_host)
self.create_migration_vm(current_model, mig_vm, mig_src_hypervisor, self.create_migration_vm(mig_vm, mig_src_hypervisor,
mig_dst_hypervisor) mig_dst_hypervisor)
def fill_solution(self, current_model): def fill_solution(self):
self.solution.model = current_model self.solution.model = self.model
self.solution.efficacy = 100 self.solution.efficacy = 100
return self.solution return self.solution
def execute(self, orign_model): def pre_execute(self):
LOG.info(_LI("Initializing Workload Stabilization")) LOG.info(_LI("Initializing Workload Stabilization"))
current_model = orign_model
if orign_model is None: if self.model is None:
raise exception.ClusterStateNotDefined() raise exception.ClusterStateNotDefined()
migration = self.check_threshold(current_model) def do_execute(self):
migration = self.check_threshold()
if migration: if migration:
hosts_load = self.get_hosts_load(current_model) hosts_load = self.get_hosts_load()
min_sd = 1 min_sd = 1
balanced = False balanced = False
for vm_host in migration: for vm_host in migration:
dst_hp_disk = current_model.get_resource_from_id( dst_hp_disk = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity( resource.ResourceType.disk).get_capacity(
current_model.get_hypervisor_from_id(vm_host['host'])) self.model.get_hypervisor_from_id(vm_host['host']))
vm_disk = current_model.get_resource_from_id( vm_disk = self.model.get_resource_from_id(
resource.ResourceType.disk).get_capacity( resource.ResourceType.disk).get_capacity(
current_model.get_vm_from_id(vm_host['vm'])) self.model.get_vm_from_id(vm_host['vm']))
if vm_disk > dst_hp_disk: if vm_disk > dst_hp_disk:
continue continue
vm_load = self.calculate_migration_case(hosts_load, vm_load = self.calculate_migration_case(hosts_load,
vm_host['vm'], vm_host['vm'],
vm_host['s_host'], vm_host['s_host'],
vm_host['host'], vm_host['host'])
current_model)
weighted_sd = self.calculate_weighted_sd(vm_load[:-1]) weighted_sd = self.calculate_weighted_sd(vm_load[:-1])
if weighted_sd < min_sd: if weighted_sd < min_sd:
min_sd = weighted_sd min_sd = weighted_sd
hosts_load = vm_load[-1] hosts_load = vm_load[-1]
self.migrate(current_model, vm_host['vm'], self.migrate(vm_host['vm'],
vm_host['s_host'], vm_host['host']) vm_host['s_host'], vm_host['host'])
for metric, value in zip(self.metrics, vm_load[:-1]): for metric, value in zip(self.metrics, vm_load[:-1]):
@@ -408,4 +403,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
break break
if balanced: if balanced:
break break
return self.fill_solution(current_model) return self.fill_solution()
def post_execute(self):
"""Post-execution phase
This can be used to compute the global efficacy
"""
self.solution.model = self.model

View File

@@ -27,6 +27,7 @@ CONF = cfg.CONF
class CollectorManager(object): class CollectorManager(object):
def get_cluster_model_collector(self, osc=None): def get_cluster_model_collector(self, osc=None):
""":param osc: an OpenStackClients instance""" """:param osc: an OpenStackClients instance"""
nova = nova_helper.NovaHelper(osc=osc) nova = nova_helper.NovaHelper(osc=osc)

View File

@@ -36,10 +36,11 @@ class SolutionFaker(object):
def build(): def build():
metrics = fake.FakerMetricsCollector() metrics = fake.FakerMetricsCollector()
current_state_cluster = faker_cluster_state.FakerModelCollector() current_state_cluster = faker_cluster_state.FakerModelCollector()
sercon = strategies.BasicConsolidation() sercon = strategies.BasicConsolidation(config=mock.Mock())
sercon.ceilometer = mock.\ sercon._model = current_state_cluster.generate_scenario_1()
MagicMock(get_statistics=metrics.mock_get_statistics) sercon.ceilometer = mock.MagicMock(
return sercon.execute(current_state_cluster.generate_scenario_1()) get_statistics=metrics.mock_get_statistics)
return sercon.execute()
class SolutionFakerSingleHyp(object): class SolutionFakerSingleHyp(object):
@@ -47,12 +48,12 @@ class SolutionFakerSingleHyp(object):
def build(): def build():
metrics = fake.FakerMetricsCollector() metrics = fake.FakerMetricsCollector()
current_state_cluster = faker_cluster_state.FakerModelCollector() current_state_cluster = faker_cluster_state.FakerModelCollector()
sercon = strategies.BasicConsolidation() sercon = strategies.BasicConsolidation(config=mock.Mock())
sercon.ceilometer = \ sercon._model = (
mock.MagicMock(get_statistics=metrics.mock_get_statistics)
return sercon.execute(
current_state_cluster.generate_scenario_3_with_2_hypervisors()) current_state_cluster.generate_scenario_3_with_2_hypervisors())
sercon.ceilometer = mock.MagicMock(
get_statistics=metrics.mock_get_statistics)
return sercon.execute()
class TestActionScheduling(base.DbTestCase): class TestActionScheduling(base.DbTestCase):

View File

@@ -13,6 +13,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock import mock
from watcher.decision_engine.solution.default import DefaultSolution from watcher.decision_engine.solution.default import DefaultSolution
@@ -20,10 +21,7 @@ from watcher.decision_engine.strategy.context.default import \
DefaultStrategyContext DefaultStrategyContext
from watcher.decision_engine.strategy.selection.default import \ from watcher.decision_engine.strategy.selection.default import \
DefaultStrategySelector DefaultStrategySelector
from watcher.decision_engine.strategy.strategies.dummy_strategy import \ from watcher.decision_engine.strategy.strategies import dummy_strategy
DummyStrategy
from watcher.metrics_engine.cluster_model_collector.manager import \
CollectorManager
from watcher.tests.db import base from watcher.tests.db import base
from watcher.tests.objects import utils as obj_utils from watcher.tests.objects import utils as obj_utils
@@ -32,18 +30,19 @@ class TestStrategyContext(base.DbTestCase):
def setUp(self): def setUp(self):
super(TestStrategyContext, self).setUp() super(TestStrategyContext, self).setUp()
obj_utils.create_test_goal(self.context, id=1, name="DUMMY") obj_utils.create_test_goal(self.context, id=1, name="DUMMY")
audit_template = obj_utils.create_test_audit_template( audit_template = obj_utils.create_test_audit_template(self.context)
self.context)
self.audit = obj_utils.create_test_audit( self.audit = obj_utils.create_test_audit(
self.context, audit_template_id=audit_template.id) self.context, audit_template_id=audit_template.id)
strategy_context = DefaultStrategyContext() strategy_context = DefaultStrategyContext()
@mock.patch.object(dummy_strategy.DummyStrategy, 'model',
new_callable=mock.PropertyMock)
@mock.patch.object(DefaultStrategySelector, 'select') @mock.patch.object(DefaultStrategySelector, 'select')
@mock.patch.object(CollectorManager, "get_cluster_model_collector", def test_execute_strategy(self, mock_call, m_model):
mock.Mock()) m_model.return_value = mock.Mock()
def test_execute_strategy(self, mock_call): mock_call.return_value = dummy_strategy.DummyStrategy(
mock_call.return_value = DummyStrategy() config=mock.Mock())
solution = self.strategy_context.execute_strategy( solution = self.strategy_context.execute_strategy(
self.audit.uuid, self.context) self.audit.uuid, self.context)
self.assertIsInstance(solution, DefaultSolution) self.assertIsInstance(solution, DefaultSolution)

View File

@@ -31,11 +31,30 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestBasicConsolidation(base.BaseTestCase): class TestBasicConsolidation(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster def setUp(self):
fake_cluster = faker_cluster_state.FakerModelCollector() super(TestBasicConsolidation, self).setUp()
# fake metrics
self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.BasicConsolidation, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.BasicConsolidation, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.BasicConsolidation(config=mock.Mock())
def test_cluster_size(self): def test_cluster_size(self):
size_cluster = len( size_cluster = len(
@@ -44,134 +63,98 @@ class TestBasicConsolidation(base.BaseTestCase):
self.assertEqual(size_cluster_assert, size_cluster) self.assertEqual(size_cluster_assert, size_cluster)
def test_basic_consolidation_score_hypervisor(self): def test_basic_consolidation_score_hypervisor(self):
cluster = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
sercon = strategies.BasicConsolidation() self.m_model.return_value = model
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
node_1_score = 0.023333333333333317 node_1_score = 0.023333333333333317
self.assertEqual(node_1_score, sercon.calculate_score_node( self.assertEqual(node_1_score, self.strategy.calculate_score_node(
cluster.get_hypervisor_from_id("Node_1"), model.get_hypervisor_from_id("Node_1")))
cluster))
node_2_score = 0.26666666666666666 node_2_score = 0.26666666666666666
self.assertEqual(node_2_score, sercon.calculate_score_node( self.assertEqual(node_2_score, self.strategy.calculate_score_node(
cluster.get_hypervisor_from_id("Node_2"), model.get_hypervisor_from_id("Node_2")))
cluster))
node_0_score = 0.023333333333333317 node_0_score = 0.023333333333333317
self.assertEqual(node_0_score, sercon.calculate_score_node( self.assertEqual(node_0_score, self.strategy.calculate_score_node(
cluster.get_hypervisor_from_id("Node_0"), model.get_hypervisor_from_id("Node_0")))
cluster))
def test_basic_consolidation_score_vm(self): def test_basic_consolidation_score_vm(self):
cluster = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
sercon = strategies.BasicConsolidation() self.m_model.return_value = model
sercon.ceilometer = mock.MagicMock( vm_0 = model.get_vm_from_id("VM_0")
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
vm_0_score = 0.023333333333333317 vm_0_score = 0.023333333333333317
self.assertEqual(vm_0_score, sercon.calculate_score_vm(vm_0, cluster)) self.assertEqual(vm_0_score, self.strategy.calculate_score_vm(vm_0))
vm_1 = cluster.get_vm_from_id("VM_1") vm_1 = model.get_vm_from_id("VM_1")
vm_1_score = 0.023333333333333317 vm_1_score = 0.023333333333333317
self.assertEqual(vm_1_score, sercon.calculate_score_vm(vm_1, cluster)) self.assertEqual(vm_1_score, self.strategy.calculate_score_vm(vm_1))
vm_2 = cluster.get_vm_from_id("VM_2") vm_2 = model.get_vm_from_id("VM_2")
vm_2_score = 0.033333333333333326 vm_2_score = 0.033333333333333326
self.assertEqual(vm_2_score, sercon.calculate_score_vm(vm_2, cluster)) self.assertEqual(vm_2_score, self.strategy.calculate_score_vm(vm_2))
vm_6 = cluster.get_vm_from_id("VM_6") vm_6 = model.get_vm_from_id("VM_6")
vm_6_score = 0.02666666666666669 vm_6_score = 0.02666666666666669
self.assertEqual(vm_6_score, sercon.calculate_score_vm(vm_6, cluster)) self.assertEqual(vm_6_score, self.strategy.calculate_score_vm(vm_6))
vm_7 = cluster.get_vm_from_id("VM_7") vm_7 = model.get_vm_from_id("VM_7")
vm_7_score = 0.013333333333333345 vm_7_score = 0.013333333333333345
self.assertEqual(vm_7_score, sercon.calculate_score_vm(vm_7, cluster)) self.assertEqual(vm_7_score, self.strategy.calculate_score_vm(vm_7))
def test_basic_consolidation_score_vm_disk(self): def test_basic_consolidation_score_vm_disk(self):
cluster = self.fake_cluster.generate_scenario_5_with_vm_disk_0() model = self.fake_cluster.generate_scenario_5_with_vm_disk_0()
sercon = strategies.BasicConsolidation() self.m_model.return_value = model
sercon.ceilometer = mock.MagicMock( vm_0 = model.get_vm_from_id("VM_0")
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
vm_0_score = 0.023333333333333355 vm_0_score = 0.023333333333333355
self.assertEqual(vm_0_score, sercon.calculate_score_vm(vm_0, cluster)) self.assertEqual(vm_0_score, self.strategy.calculate_score_vm(vm_0, ))
def test_basic_consolidation_weight(self): def test_basic_consolidation_weight(self):
cluster = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
sercon = strategies.BasicConsolidation() self.m_model.return_value = model
sercon.ceilometer = mock.MagicMock( vm_0 = model.get_vm_from_id("VM_0")
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
cores = 16 cores = 16
# 80 Go # 80 Go
disk = 80 disk = 80
# mem 8 Go # mem 8 Go
mem = 8 mem = 8
vm_0_weight_assert = 3.1999999999999997 vm_0_weight_assert = 3.1999999999999997
self.assertEqual(vm_0_weight_assert, self.assertEqual(
sercon.calculate_weight(cluster, vm_0, cores, disk, vm_0_weight_assert,
mem)) self.strategy.calculate_weight(vm_0, cores, disk, mem))
def test_calculate_migration_efficacy(self): def test_calculate_migration_efficacy(self):
sercon = strategies.BasicConsolidation() self.strategy.calculate_migration_efficacy()
sercon.calculate_migration_efficacy()
def test_exception_model(self): def test_exception_model(self):
sercon = strategies.BasicConsolidation() self.m_model.return_value = None
self.assertRaises(exception.ClusterStateNotDefined, sercon.execute, self.assertRaises(
None) exception.ClusterStateNotDefined, self.strategy.execute)
def test_exception_cluster_empty(self): def test_exception_cluster_empty(self):
sercon = strategies.BasicConsolidation()
model = model_root.ModelRoot() model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, sercon.execute, self.m_model.return_value = model
model) self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_calculate_score_vm_raise_cluster_state_not_found(self):
metrics = faker_metrics_collector.FakerMetricsCollector()
metrics.empty_one_metric("CPU_COMPUTE")
sercon = strategies.BasicConsolidation()
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.assertRaises(exception.ClusterStateNotDefined,
sercon.calculate_score_vm, "VM_1", None)
def test_check_migration(self): def test_check_migration(self):
sercon = strategies.BasicConsolidation() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
fake_cluster = faker_cluster_state.FakerModelCollector() self.m_model.return_value = model
model = fake_cluster.generate_scenario_3_with_2_hypervisors()
all_vms = model.get_all_vms() all_vms = model.get_all_vms()
all_hyps = model.get_all_hypervisors() all_hyps = model.get_all_hypervisors()
vm0 = all_vms[list(all_vms.keys())[0]] vm0 = all_vms[list(all_vms.keys())[0]]
hyp0 = all_hyps[list(all_hyps.keys())[0]] hyp0 = all_hyps[list(all_hyps.keys())[0]]
sercon.check_migration(model, hyp0, hyp0, vm0) self.strategy.check_migration(hyp0, hyp0, vm0)
def test_threshold(self): def test_threshold(self):
sercon = strategies.BasicConsolidation() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
fake_cluster = faker_cluster_state.FakerModelCollector() self.m_model.return_value = model
model = fake_cluster.generate_scenario_3_with_2_hypervisors()
all_hyps = model.get_all_hypervisors() all_hyps = model.get_all_hypervisors()
hyp0 = all_hyps[list(all_hyps.keys())[0]] hyp0 = all_hyps[list(all_hyps.keys())[0]]
sercon.check_threshold(model, hyp0, 1000, 1000, 1000) self.assertFalse(self.strategy.check_threshold(
hyp0, 1000, 1000, 1000))
threshold_cores = sercon.get_threshold_cores()
sercon.set_threshold_cores(threshold_cores + 1)
self.assertEqual(threshold_cores + 1, sercon.get_threshold_cores())
def test_number_of(self):
sercon = strategies.BasicConsolidation()
sercon.get_number_of_released_nodes()
sercon.get_number_of_migrations()
def test_basic_consolidation_migration(self): def test_basic_consolidation_migration(self):
sercon = strategies.BasicConsolidation() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
sercon.ceilometer = mock.MagicMock( self.m_model.return_value = model
statistic_aggregation=self.fake_metrics.mock_get_statistics)
solution = sercon.execute( solution = self.strategy.execute()
self.fake_cluster.generate_scenario_3_with_2_hypervisors())
actions_counter = collections.Counter( actions_counter = collections.Counter(
[action.get('action_type') for action in solution.actions]) [action.get('action_type') for action in solution.actions])
@@ -187,27 +170,22 @@ class TestBasicConsolidation(base.BaseTestCase):
# calculate_weight # calculate_weight
def test_execute_no_workload(self): def test_execute_no_workload(self):
sercon = strategies.BasicConsolidation() model = (
sercon.ceilometer = mock.MagicMock( self.fake_cluster
statistic_aggregation=self.fake_metrics.mock_get_statistics) .generate_scenario_4_with_1_hypervisor_no_vm())
self.m_model.return_value = model
current_state_cluster = faker_cluster_state.FakerModelCollector() with mock.patch.object(
model = current_state_cluster. \ strategies.BasicConsolidation, 'calculate_weight'
generate_scenario_4_with_1_hypervisor_no_vm() ) as mock_score_call:
with mock.patch.object(strategies.BasicConsolidation,
'calculate_weight') \
as mock_score_call:
mock_score_call.return_value = 0 mock_score_call.return_value = 0
solution = sercon.execute(model) solution = self.strategy.execute()
self.assertEqual(100, solution.efficacy) self.assertEqual(0, solution.efficacy)
def test_check_parameters(self): def test_check_parameters(self):
sercon = strategies.BasicConsolidation() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
sercon.ceilometer = mock.MagicMock( self.m_model.return_value = model
statistic_aggregation=self.fake_metrics.mock_get_statistics) solution = self.strategy.execute()
solution = sercon.execute(
self.fake_cluster.generate_scenario_3_with_2_hypervisors())
loader = default.DefaultActionLoader() loader = default.DefaultActionLoader()
for action in solution.actions: for action in solution.actions:
loaded_action = loader.load(action['action_type']) loaded_action = loader.load(action['action_type'])

View File

@@ -14,7 +14,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock
from watcher.applier.actions.loading import default from watcher.applier.actions.loading import default
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies from watcher.decision_engine.strategy import strategies
from watcher.tests import base from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies import \ from watcher.tests.decision_engine.strategy.strategies import \
@@ -22,18 +25,33 @@ from watcher.tests.decision_engine.strategy.strategies import \
class TestDummyStrategy(base.TestCase): class TestDummyStrategy(base.TestCase):
def setUp(self):
super(TestDummyStrategy, self).setUp()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.DummyStrategy, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
self.m_model.return_value = model_root.ModelRoot()
self.strategy = strategies.DummyStrategy(config=mock.Mock())
self.m_model.return_value = model_root.ModelRoot()
self.strategy = strategies.DummyStrategy(config=mock.Mock())
def test_dummy_strategy(self): def test_dummy_strategy(self):
dummy = strategies.DummyStrategy() dummy = strategies.DummyStrategy(config=mock.Mock())
fake_cluster = faker_cluster_state.FakerModelCollector() solution = dummy.execute()
model = fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = dummy.execute(model)
self.assertEqual(3, len(solution.actions)) self.assertEqual(3, len(solution.actions))
def test_check_parameters(self): def test_check_parameters(self):
dummy = strategies.DummyStrategy() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
fake_cluster = faker_cluster_state.FakerModelCollector() self.m_model.return_value = model
model = fake_cluster.generate_scenario_3_with_2_hypervisors() solution = self.strategy.execute()
solution = dummy.execute(model)
loader = default.DefaultActionLoader() loader = default.DefaultActionLoader()
for action in solution.actions: for action in solution.actions:
loaded_action = loader.load(action['action_type']) loaded_action = loader.load(action['action_type'])

View File

@@ -32,93 +32,95 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestOutletTempControl(base.BaseTestCase): class TestOutletTempControl(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster def setUp(self):
fake_cluster = faker_cluster_state.FakerModelCollector() super(TestOutletTempControl, self).setUp()
# fake metrics
self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.OutletTempControl, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.OutletTempControl, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.OutletTempControl(config=mock.Mock())
def test_calc_used_res(self): def test_calc_used_res(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl() self.m_model.return_value = model
hypervisor = model.get_hypervisor_from_id('Node_0') hypervisor = model.get_hypervisor_from_id('Node_0')
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory) cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk) cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cores_used, mem_used, disk_used = strategy.calc_used_res(model, cores_used, mem_used, disk_used = self.strategy.calc_used_res(
hypervisor, hypervisor, cap_cores, cap_mem, cap_disk)
cap_cores,
cap_mem,
cap_disk)
self.assertEqual((10, 2, 20), (cores_used, mem_used, disk_used)) self.assertEqual((10, 2, 20), (cores_used, mem_used, disk_used))
def test_group_hosts_by_outlet_temp(self): def test_group_hosts_by_outlet_temp(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl() self.m_model.return_value = model
strategy.ceilometer = mock.MagicMock( h1, h2 = self.strategy.group_hosts_by_outlet_temp()
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
self.assertEqual('Node_1', h1[0]['hv'].uuid) self.assertEqual('Node_1', h1[0]['hv'].uuid)
self.assertEqual('Node_0', h2[0]['hv'].uuid) self.assertEqual('Node_0', h2[0]['hv'].uuid)
def test_choose_vm_to_migrate(self): def test_choose_vm_to_migrate(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl() self.m_model.return_value = model
strategy.ceilometer = mock.MagicMock( h1, h2 = self.strategy.group_hosts_by_outlet_temp()
statistic_aggregation=self.fake_metrics.mock_get_statistics) vm_to_mig = self.strategy.choose_vm_to_migrate(h1)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
self.assertEqual('Node_1', vm_to_mig[0].uuid) self.assertEqual('Node_1', vm_to_mig[0].uuid)
self.assertEqual('a4cab39b-9828-413a-bf88-f76921bf1517', self.assertEqual('a4cab39b-9828-413a-bf88-f76921bf1517',
vm_to_mig[1].uuid) vm_to_mig[1].uuid)
def test_filter_dest_servers(self): def test_filter_dest_servers(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = strategies.OutletTempControl() self.m_model.return_value = model
strategy.ceilometer = mock.MagicMock( h1, h2 = self.strategy.group_hosts_by_outlet_temp()
statistic_aggregation=self.fake_metrics.mock_get_statistics) vm_to_mig = self.strategy.choose_vm_to_migrate(h1)
h1, h2 = strategy.group_hosts_by_outlet_temp(model) dest_hosts = self.strategy.filter_dest_servers(h2, vm_to_mig[1])
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
dest_hosts = strategy.filter_dest_servers(model, h2, vm_to_mig[1])
self.assertEqual(1, len(dest_hosts)) self.assertEqual(1, len(dest_hosts))
self.assertEqual('Node_0', dest_hosts[0]['hv'].uuid) self.assertEqual('Node_0', dest_hosts[0]['hv'].uuid)
def test_exception_model(self): def test_exception_model(self):
strategy = strategies.OutletTempControl() self.m_model.return_value = None
self.assertRaises(exception.ClusterStateNotDefined, strategy.execute, self.assertRaises(
None) exception.ClusterStateNotDefined, self.strategy.execute)
def test_exception_cluster_empty(self): def test_exception_cluster_empty(self):
strategy = strategies.OutletTempControl()
model = model_root.ModelRoot() model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model) self.m_model.return_value = model
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_cluster_empty(self): def test_execute_cluster_empty(self):
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = model_root.ModelRoot() model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model) self.m_model.return_value = model
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_no_workload(self): def test_execute_no_workload(self):
strategy = strategies.OutletTempControl() model = self.fake_cluster.generate_scenario_4_with_1_hypervisor_no_vm()
strategy.ceilometer = mock.MagicMock( self.m_model.return_value = model
statistic_aggregation=self.fake_metrics.mock_get_statistics)
current_state_cluster = faker_cluster_state.FakerModelCollector() solution = self.strategy.execute()
model = current_state_cluster. \
generate_scenario_4_with_1_hypervisor_no_vm()
solution = strategy.execute(model)
self.assertEqual([], solution.actions) self.assertEqual([], solution.actions)
def test_execute(self): def test_execute(self):
strategy = strategies.OutletTempControl()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = strategy.execute(model) self.m_model.return_value = model
solution = self.strategy.execute()
actions_counter = collections.Counter( actions_counter = collections.Counter(
[action.get('action_type') for action in solution.actions]) [action.get('action_type') for action in solution.actions])
@@ -126,11 +128,9 @@ class TestOutletTempControl(base.BaseTestCase):
self.assertEqual(1, num_migrations) self.assertEqual(1, num_migrations)
def test_check_parameters(self): def test_check_parameters(self):
outlet = strategies.OutletTempControl()
outlet.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = outlet.execute(model) self.m_model.return_value = model
solution = self.strategy.execute()
loader = default.DefaultActionLoader() loader = default.DefaultActionLoader()
for action in solution.actions: for action in solution.actions:
loaded_action = loader.load(action['action_type']) loaded_action = loader.load(action['action_type'])

View File

@@ -17,213 +17,215 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
import mock import mock
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies from watcher.decision_engine.strategy import strategies
from watcher.tests import base from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \ from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_and_metrics import faker_cluster_and_metrics
class TestSmartConsolidation(base.BaseTestCase): class TestVMWorkloadConsolidation(base.BaseTestCase):
fake_cluster = faker_cluster_and_metrics.FakerModelCollector()
def setUp(self):
super(TestVMWorkloadConsolidation, self).setUp()
# fake cluster
self.fake_cluster = faker_cluster_and_metrics.FakerModelCollector()
p_model = mock.patch.object(
strategies.VMWorkloadConsolidation, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.VMWorkloadConsolidation, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
# fake metrics
self.fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(
self.m_model.return_value)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.VMWorkloadConsolidation(config=mock.Mock())
def test_get_vm_utilization(self): def test_get_vm_utilization(self):
cluster = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock( vm_0 = model.get_vm_from_id("VM_0")
statistic_aggregation=fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
vm_util = dict(cpu=1.0, ram=1, disk=10) vm_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(vm_util, self.assertEqual(vm_util,
strategy.get_vm_utilization(vm_0.uuid, cluster)) self.strategy.get_vm_utilization(vm_0.uuid, model))
def test_get_hypervisor_utilization(self): def test_get_hypervisor_utilization(self):
cluster = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock( node_0 = model.get_hypervisor_from_id("Node_0")
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=1.0, ram=1, disk=10) node_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(node_util, self.assertEqual(
strategy.get_hypervisor_utilization(node_0, cluster)) node_util,
self.strategy.get_hypervisor_utilization(node_0, model))
def test_get_hypervisor_capacity(self): def test_get_hypervisor_capacity(self):
cluster = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock( node_0 = model.get_hypervisor_from_id("Node_0")
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=40, ram=64, disk=250) node_util = dict(cpu=40, ram=64, disk=250)
self.assertEqual(node_util, self.assertEqual(node_util,
strategy.get_hypervisor_capacity(node_0, cluster)) self.strategy.get_hypervisor_capacity(node_0, model))
def test_get_relative_hypervisor_utilization(self): def test_get_relative_hypervisor_utilization(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
hypervisor = model.get_hypervisor_from_id('Node_0') hypervisor = model.get_hypervisor_from_id('Node_0')
rhu = strategy.get_relative_hypervisor_utilization(hypervisor, model) rhu = self.strategy.get_relative_hypervisor_utilization(
hypervisor, model)
expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025} expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025}
self.assertEqual(expected_rhu, rhu) self.assertEqual(expected_rhu, rhu)
def test_get_relative_cluster_utilization(self): def test_get_relative_cluster_utilization(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock( cru = self.strategy.get_relative_cluster_utilization(model)
statistic_aggregation=fake_metrics.mock_get_statistics)
cru = strategy.get_relative_cluster_utilization(model)
expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375} expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375}
self.assertEqual(expected_cru, cru) self.assertEqual(expected_cru, cru)
def test_add_migration(self): def test_add_migration(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0') h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1') h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0' vm_uuid = 'VM_0'
strategy.add_migration(vm_uuid, h1, h2, model) self.strategy.add_migration(vm_uuid, h1, h2, model)
self.assertEqual(1, len(strategy.solution.actions)) self.assertEqual(1, len(self.strategy.solution.actions))
expected = {'action_type': 'migrate', expected = {'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid, 'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid, 'src_hypervisor': h1.uuid,
'migration_type': 'live', 'migration_type': 'live',
'resource_id': vm_uuid}} 'resource_id': vm_uuid}}
self.assertEqual(expected, strategy.solution.actions[0]) self.assertEqual(expected, self.strategy.solution.actions[0])
def test_is_overloaded(self): def test_is_overloaded(self):
strategy = strategies.VMWorkloadConsolidation()
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy.ceilometer = mock.MagicMock( self.fake_metrics.model = model
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0') h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc) res = self.strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res) self.assertEqual(False, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc) res = self.strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res) self.assertEqual(False, res)
cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc) res = self.strategy.is_overloaded(h1, model, cc)
self.assertEqual(True, res) self.assertEqual(True, res)
def test_vm_fits(self): def test_vm_fits(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_1') h = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0' vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc) res = self.strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(True, res) self.assertEqual(True, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc) res = self.strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(False, res) self.assertEqual(False, res)
def test_add_action_activate_hypervisor(self): def test_add_action_activate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_0') h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_activate_hypervisor(h) self.strategy.add_action_activate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state', expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'up', 'input_parameters': {'state': 'up',
'resource_id': 'Node_0'}}] 'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)
def test_add_action_deactivate_hypervisor(self): def test_add_action_deactivate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_0') h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_deactivate_hypervisor(h) self.strategy.add_action_deactivate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state', expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down', 'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}}] 'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)
def test_deactivate_unused_hypervisors(self): def test_deactivate_unused_hypervisors(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0') h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1') h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0' vm_uuid = 'VM_0'
strategy.deactivate_unused_hypervisors(model) self.strategy.deactivate_unused_hypervisors(model)
self.assertEqual(0, len(strategy.solution.actions)) self.assertEqual(0, len(self.strategy.solution.actions))
# Migrate VM to free the hypervisor # Migrate VM to free the hypervisor
strategy.add_migration(vm_uuid, h1, h2, model) self.strategy.add_migration(vm_uuid, h1, h2, model)
strategy.deactivate_unused_hypervisors(model) self.strategy.deactivate_unused_hypervisors(model)
expected = {'action_type': 'change_nova_service_state', expected = {'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down', 'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}} 'resource_id': 'Node_0'}}
self.assertEqual(2, len(strategy.solution.actions)) self.assertEqual(2, len(self.strategy.solution.actions))
self.assertEqual(expected, strategy.solution.actions[1]) self.assertEqual(expected, self.strategy.solution.actions[1])
def test_offload_phase(self): def test_offload_phase(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc) self.strategy.offload_phase(model, cc)
expected = [] expected = []
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)
def test_consolidation_phase(self): def test_consolidation_phase(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0') h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1') h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0' vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.consolidation_phase(model, cc) self.strategy.consolidation_phase(model, cc)
expected = [{'action_type': 'migrate', expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid, 'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid, 'src_hypervisor': h1.uuid,
'migration_type': 'live', 'migration_type': 'live',
'resource_id': vm_uuid}}] 'resource_id': vm_uuid}}]
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)
def test_strategy(self): def test_strategy(self):
model = self.fake_cluster.generate_scenario_2() model = self.fake_cluster.generate_scenario_2()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0') h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc) self.strategy.offload_phase(model, cc)
strategy.consolidation_phase(model, cc) self.strategy.consolidation_phase(model, cc)
strategy.optimize_solution(model) self.strategy.optimize_solution(model)
h2 = strategy.solution.actions[0][ h2 = self.strategy.solution.actions[0][
'input_parameters']['dst_hypervisor'] 'input_parameters']['dst_hypervisor']
expected = [{'action_type': 'migrate', expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2, 'input_parameters': {'dst_hypervisor': h2,
@@ -236,18 +238,16 @@ class TestSmartConsolidation(base.BaseTestCase):
'migration_type': 'live', 'migration_type': 'live',
'resource_id': 'VM_1'}}] 'resource_id': 'VM_1'}}]
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)
def test_strategy2(self): def test_strategy2(self):
model = self.fake_cluster.generate_scenario_3() model = self.fake_cluster.generate_scenario_3()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) self.m_model.return_value = model
strategy = strategies.VMWorkloadConsolidation() self.fake_metrics.model = model
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0') h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1') h2 = model.get_hypervisor_from_id('Node_1')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc) self.strategy.offload_phase(model, cc)
expected = [{'action_type': 'migrate', expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid, 'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live', 'migration_type': 'live',
@@ -263,15 +263,15 @@ class TestSmartConsolidation(base.BaseTestCase):
'migration_type': 'live', 'migration_type': 'live',
'resource_id': 'VM_8', 'resource_id': 'VM_8',
'src_hypervisor': h1.uuid}}] 'src_hypervisor': h1.uuid}}]
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)
strategy.consolidation_phase(model, cc) self.strategy.consolidation_phase(model, cc)
expected.append({'action_type': 'migrate', expected.append({'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h1.uuid, 'input_parameters': {'dst_hypervisor': h1.uuid,
'migration_type': 'live', 'migration_type': 'live',
'resource_id': 'VM_7', 'resource_id': 'VM_7',
'src_hypervisor': h2.uuid}}) 'src_hypervisor': h2.uuid}})
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)
strategy.optimize_solution(model) self.strategy.optimize_solution(model)
del expected[3] del expected[3]
del expected[1] del expected[1]
self.assertEqual(expected, strategy.solution.actions) self.assertEqual(expected, self.strategy.solution.actions)

View File

@@ -23,7 +23,7 @@ from watcher.applier.actions.loading import default
from watcher.common import exception from watcher.common import exception
from watcher.decision_engine.model import model_root from watcher.decision_engine.model import model_root
from watcher.decision_engine.model import resource from watcher.decision_engine.model import resource
from watcher.decision_engine.strategy.strategies import workload_balance from watcher.decision_engine.strategy import strategies
from watcher.tests import base from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \ from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_state import faker_cluster_state
@@ -32,31 +32,51 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestWorkloadBalance(base.BaseTestCase): class TestWorkloadBalance(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster def setUp(self):
fake_cluster = faker_cluster_state.FakerModelCollector() super(TestWorkloadBalance, self).setUp()
# fake metrics
self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.WorkloadBalance, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.BasicConsolidation, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.WorkloadBalance(config=mock.Mock())
def test_calc_used_res(self): def test_calc_used_res(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance() self.m_model.return_value = model
hypervisor = model.get_hypervisor_from_id('Node_0') hypervisor = model.get_hypervisor_from_id('Node_0')
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory) cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk) cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cores_used, mem_used, disk_used = strategy.calculate_used_resource( cores_used, mem_used, disk_used = (
model, hypervisor, cap_cores, cap_mem, cap_disk) self.strategy.calculate_used_resource(
hypervisor, cap_cores, cap_mem, cap_disk))
self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40)) self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40))
def test_group_hosts_by_cpu_util(self): def test_group_hosts_by_cpu_util(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance() self.m_model.return_value = model
strategy.threshold = 30 self.strategy.threshold = 30
strategy.ceilometer = mock.MagicMock( self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
# print h1, h2, avg, w_map # print h1, h2, avg, w_map
self.assertEqual(h1[0]['hv'].uuid, 'Node_0') self.assertEqual(h1[0]['hv'].uuid, 'Node_0')
self.assertEqual(h2[0]['hv'].uuid, 'Node_1') self.assertEqual(h2[0]['hv'].uuid, 'Node_1')
@@ -64,73 +84,69 @@ class TestWorkloadBalance(base.BaseTestCase):
def test_choose_vm_to_migrate(self): def test_choose_vm_to_migrate(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance() self.m_model.return_value = model
strategy.ceilometer = mock.MagicMock( self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map)
self.assertEqual(vm_to_mig[0].uuid, 'Node_0') self.assertEqual(vm_to_mig[0].uuid, 'Node_0')
self.assertEqual(vm_to_mig[1].uuid, self.assertEqual(vm_to_mig[1].uuid,
"73b09e16-35b7-4922-804e-e8f5d9b740fc") "73b09e16-35b7-4922-804e-e8f5d9b740fc")
def test_choose_vm_notfound(self): def test_choose_vm_notfound(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance() self.m_model.return_value = model
strategy.ceilometer = mock.MagicMock( self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
vms = model.get_all_vms() vms = model.get_all_vms()
vms.clear() vms.clear()
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map)
self.assertEqual(vm_to_mig, None) self.assertEqual(vm_to_mig, None)
def test_filter_destination_hosts(self): def test_filter_destination_hosts(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance() self.m_model.return_value = model
strategy.ceilometer = mock.MagicMock( self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map)
dest_hosts = strategy.filter_destination_hosts(model, h2, vm_to_mig[1], dest_hosts = self.strategy.filter_destination_hosts(
avg, w_map) h2, vm_to_mig[1], avg, w_map)
self.assertEqual(len(dest_hosts), 1) self.assertEqual(len(dest_hosts), 1)
self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_1') self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_1')
def test_exception_model(self): def test_exception_model(self):
strategy = workload_balance.WorkloadBalance() self.m_model.return_value = None
self.assertRaises(exception.ClusterStateNotDefined, strategy.execute, self.assertRaises(
None) exception.ClusterStateNotDefined, self.strategy.execute)
def test_exception_cluster_empty(self): def test_exception_cluster_empty(self):
strategy = workload_balance.WorkloadBalance()
model = model_root.ModelRoot() model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model) self.m_model.return_value = model
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_cluster_empty(self): def test_execute_cluster_empty(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = model_root.ModelRoot() model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model) self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
self.assertRaises(exception.ClusterEmpty, self.strategy.execute)
def test_execute_no_workload(self): def test_execute_no_workload(self):
strategy = workload_balance.WorkloadBalance() model = self.fake_cluster.generate_scenario_4_with_1_hypervisor_no_vm()
strategy.ceilometer = mock.MagicMock( self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
solution = self.strategy.execute()
current_state_cluster = faker_cluster_state.FakerModelCollector()
model = current_state_cluster. \
generate_scenario_4_with_1_hypervisor_no_vm()
solution = strategy.execute(model)
self.assertEqual(solution.actions, []) self.assertEqual(solution.actions, [])
def test_execute(self): def test_execute(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
solution = strategy.execute(model) self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
solution = self.strategy.execute()
actions_counter = collections.Counter( actions_counter = collections.Counter(
[action.get('action_type') for action in solution.actions]) [action.get('action_type') for action in solution.actions])
@@ -138,11 +154,11 @@ class TestWorkloadBalance(base.BaseTestCase):
self.assertEqual(num_migrations, 1) self.assertEqual(num_migrations, 1)
def test_check_parameters(self): def test_check_parameters(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
solution = strategy.execute(model) self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
solution = self.strategy.execute()
loader = default.DefaultActionLoader() loader = default.DefaultActionLoader()
for action in solution.actions: for action in solution.actions:
loaded_action = loader.load(action['action_type']) loaded_action = loader.load(action['action_type'])

View File

@@ -19,6 +19,7 @@
import mock import mock
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies from watcher.decision_engine.strategy import strategies
from watcher.tests import base from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \ from watcher.tests.decision_engine.strategy.strategies \
@@ -28,40 +29,48 @@ from watcher.tests.decision_engine.strategy.strategies \
class TestWorkloadStabilization(base.BaseTestCase): class TestWorkloadStabilization(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster def setUp(self):
fake_cluster = faker_cluster_state.FakerModelCollector() super(TestWorkloadStabilization, self).setUp()
hosts_load_assert = {'Node_0': # fake metrics
{'cpu_util': 0.07, 'memory.resident': 7.0, self.fake_metrics = faker_metrics_collector.FakerMetricsCollector()
'vcpus': 40},
'Node_1': # fake cluster
{'cpu_util': 0.05, 'memory.resident': 5, self.fake_cluster = faker_cluster_state.FakerModelCollector()
'vcpus': 40},
'Node_2': self.hosts_load_assert = {
{'cpu_util': 0.1, 'memory.resident': 29, 'Node_0': {'cpu_util': 0.07, 'memory.resident': 7.0, 'vcpus': 40},
'vcpus': 40}, 'Node_1': {'cpu_util': 0.05, 'memory.resident': 5, 'vcpus': 40},
'Node_3': 'Node_2': {'cpu_util': 0.1, 'memory.resident': 29, 'vcpus': 40},
{'cpu_util': 0.04, 'memory.resident': 8, 'Node_3': {'cpu_util': 0.04, 'memory.resident': 8, 'vcpus': 40},
'vcpus': 40}, 'Node_4': {'cpu_util': 0.02, 'memory.resident': 4, 'vcpus': 40}}
'Node_4':
{'cpu_util': 0.02, 'memory.resident': 4, p_model = mock.patch.object(
'vcpus': 40}} strategies.WorkloadStabilization, "model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.WorkloadStabilization, "ceilometer",
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.WorkloadStabilization(config=mock.Mock())
def test_get_vm_load(self): def test_get_vm_load(self):
model = self.fake_cluster.generate_scenario_1() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
sd = strategies.WorkloadStabilization()
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0_dict = {'uuid': 'VM_0', 'vcpus': 10, vm_0_dict = {'uuid': 'VM_0', 'vcpus': 10,
'cpu_util': 7, 'memory.resident': 2} 'cpu_util': 7, 'memory.resident': 2}
self.assertEqual(sd.get_vm_load("VM_0", model), vm_0_dict) self.assertEqual(vm_0_dict, self.strategy.get_vm_load("VM_0"))
def test_normalize_hosts_load(self): def test_normalize_hosts_load(self):
model = self.fake_cluster.generate_scenario_1() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
sd = strategies.WorkloadStabilization()
fake_hosts = {'Node_0': {'cpu_util': 0.07, 'memory.resident': 7}, fake_hosts = {'Node_0': {'cpu_util': 0.07, 'memory.resident': 7},
'Node_1': {'cpu_util': 0.05, 'memory.resident': 5}} 'Node_1': {'cpu_util': 0.05, 'memory.resident': 5}}
normalized_hosts = {'Node_0': normalized_hosts = {'Node_0':
@@ -70,99 +79,82 @@ class TestWorkloadStabilization(base.BaseTestCase):
'Node_1': 'Node_1':
{'cpu_util': 0.05, {'cpu_util': 0.05,
'memory.resident': 0.03787878787878788}} 'memory.resident': 0.03787878787878788}}
self.assertEqual(sd.normalize_hosts_load(fake_hosts, model), self.assertEqual(
normalized_hosts) normalized_hosts,
self.strategy.normalize_hosts_load(fake_hosts))
def test_get_hosts_load(self): def test_get_hosts_load(self):
sd = strategies.WorkloadStabilization() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
sd.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.assertEqual( self.assertEqual(
sd.get_hosts_load(self.fake_cluster.generate_scenario_1()), self.strategy.get_hosts_load(),
self.hosts_load_assert) self.hosts_load_assert)
def test_get_sd(self): def test_get_sd(self):
sd = strategies.WorkloadStabilization()
test_cpu_sd = 0.027 test_cpu_sd = 0.027
test_ram_sd = 9.3 test_ram_sd = 9.3
self.assertEqual( self.assertEqual(
round(sd.get_sd(self.hosts_load_assert, 'cpu_util'), 3), round(self.strategy.get_sd(
self.hosts_load_assert, 'cpu_util'), 3),
test_cpu_sd) test_cpu_sd)
self.assertEqual( self.assertEqual(
round(sd.get_sd(self.hosts_load_assert, 'memory.resident'), 1), round(self.strategy.get_sd(
self.hosts_load_assert, 'memory.resident'), 1),
test_ram_sd) test_ram_sd)
def test_calculate_weighted_sd(self): def test_calculate_weighted_sd(self):
sd = strategies.WorkloadStabilization()
sd_case = [0.5, 0.75] sd_case = [0.5, 0.75]
self.assertEqual(sd.calculate_weighted_sd(sd_case), 1.25) self.assertEqual(self.strategy.calculate_weighted_sd(sd_case), 1.25)
def test_calculate_migration_case(self): def test_calculate_migration_case(self):
model = self.fake_cluster.generate_scenario_1() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
sd = strategies.WorkloadStabilization() self.assertEqual(
sd.ceilometer = mock.MagicMock( self.strategy.calculate_migration_case(
statistic_aggregation=self.fake_metrics.mock_get_statistics) self.hosts_load_assert, "VM_5",
self.assertEqual(sd.calculate_migration_case( "Node_2", "Node_1")[-1]["Node_1"],
self.hosts_load_assert, "VM_5", "Node_2", "Node_1",
model)[-1]["Node_1"],
{'cpu_util': 2.55, 'memory.resident': 21, 'vcpus': 40}) {'cpu_util': 2.55, 'memory.resident': 21, 'vcpus': 40})
def test_simulate_migrations(self): def test_simulate_migrations(self):
sd = strategies.WorkloadStabilization() model = self.fake_cluster.generate_scenario_1()
sd.host_choice = 'retry' self.m_model.return_value = model
sd.ceilometer = mock.MagicMock( self.strategy.host_choice = 'retry'
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.assertEqual( self.assertEqual(
len(sd.simulate_migrations(self.fake_cluster.generate_scenario_1(), 8,
self.hosts_load_assert)), len(self.strategy.simulate_migrations(self.hosts_load_assert)))
8)
def test_check_threshold(self): def test_check_threshold(self):
sd = strategies.WorkloadStabilization() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
sd.ceilometer = mock.MagicMock( self.strategy.simulate_migrations = mock.Mock(return_value=True)
statistic_aggregation=self.fake_metrics.mock_get_statistics) self.assertTrue(self.strategy.check_threshold())
sd.simulate_migrations = mock.Mock(return_value=True)
self.assertTrue(
sd.check_threshold(self.fake_cluster.generate_scenario_1()))
def test_execute_one_migration(self): def test_execute_one_migration(self):
sd = strategies.WorkloadStabilization() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
model = self.fake_cluster.generate_scenario_1() self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2}
sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} self.strategy.simulate_migrations = mock.Mock(
sd.ceilometer = mock.MagicMock( return_value=[{'vm': 'VM_4', 's_host': 'Node_2', 'host': 'Node_1'}]
statistic_aggregation=self.fake_metrics.mock_get_statistics) )
sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4', with mock.patch.object(self.strategy, 'migrate') as mock_migration:
's_host': 'Node_2', self.strategy.execute()
'host': 'Node_1'}]) mock_migration.assert_called_once_with(
with mock.patch.object(sd, 'migrate') as mock_migration: 'VM_4', 'Node_2', 'Node_1')
sd.execute(model)
mock_migration.assert_called_once_with(model, 'VM_4', 'Node_2',
'Node_1')
def test_execute_multiply_migrations(self): def test_execute_multiply_migrations(self):
sd = strategies.WorkloadStabilization() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
model = self.fake_cluster.generate_scenario_1() self.strategy.thresholds = {'cpu_util': 0.00001,
sd.thresholds = {'cpu_util': 0.00001, 'memory.resident': 0.0001} 'memory.resident': 0.0001}
sd.ceilometer = mock.MagicMock( self.strategy.simulate_migrations = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics) return_value=[{'vm': 'VM_4', 's_host': 'Node_2', 'host': 'Node_1'},
sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4', {'vm': 'VM_3', 's_host': 'Node_2', 'host': 'Node_3'}]
's_host': 'Node_2', )
'host': 'Node_1'}, with mock.patch.object(self.strategy, 'migrate') as mock_migrate:
{'vm': 'VM_3', self.strategy.execute()
's_host': 'Node_2',
'host': 'Node_4'}])
with mock.patch.object(sd, 'migrate') as mock_migrate:
sd.execute(model)
self.assertEqual(mock_migrate.call_count, 1) self.assertEqual(mock_migrate.call_count, 1)
def test_execute_nothing_to_migrate(self): def test_execute_nothing_to_migrate(self):
sd = strategies.WorkloadStabilization() self.m_model.return_value = self.fake_cluster.generate_scenario_1()
model = self.fake_cluster.generate_scenario_1() self.strategy.thresholds = {'cpu_util': 0.042,
sd.thresholds = {'cpu_util': 0.042, 'memory.resident': 0.0001} 'memory.resident': 0.0001}
sd.ceilometer = mock.MagicMock( self.strategy.simulate_migrations = mock.Mock(return_value=False)
statistic_aggregation=self.fake_metrics.mock_get_statistics) with mock.patch.object(self.strategy, 'migrate') as mock_migrate:
sd.simulate_migrations = mock.Mock(return_value=False) self.strategy.execute()
with mock.patch.object(sd, 'migrate') as mock_migrate:
sd.execute(model)
mock_migrate.assert_not_called() mock_migrate.assert_not_called()