diff --git a/watcher/decision_engine/strategy/context/default.py b/watcher/decision_engine/strategy/context/default.py index c2e1e204f..9d49b6f26 100644 --- a/watcher/decision_engine/strategy/context/default.py +++ b/watcher/decision_engine/strategy/context/default.py @@ -18,7 +18,6 @@ from oslo_log import log from watcher.common import clients from watcher.decision_engine.strategy.context import base from watcher.decision_engine.strategy.selection import default -from watcher.metrics_engine.cluster_model_collector import manager from watcher import objects @@ -30,11 +29,6 @@ class DefaultStrategyContext(base.BaseStrategyContext): def __init__(self): super(DefaultStrategyContext, self).__init__() LOG.debug("Initializing Strategy Context") - self._collector_manager = manager.CollectorManager() - - @property - def collector(self): - return self._collector_manager def execute_strategy(self, audit_uuid, request_context): audit = objects.Audit.get_by_uuid(request_context, audit_uuid) @@ -46,10 +40,6 @@ class DefaultStrategyContext(base.BaseStrategyContext): osc = clients.OpenStackClients() # todo(jed) retrieve in audit_template parameters (threshold,...) # todo(jed) create ActionPlan - collector_manager = self.collector.get_cluster_model_collector(osc=osc) - - # todo(jed) remove call to get_latest_cluster_data_model - cluster_data_model = collector_manager.get_latest_cluster_data_model() strategy_selector = default.DefaultStrategySelector( goal_name=objects.Goal.get_by_id( @@ -59,5 +49,4 @@ class DefaultStrategyContext(base.BaseStrategyContext): selected_strategy = strategy_selector.select() - # todo(jed) add parameters and remove cluster_data_model - return selected_strategy.execute(cluster_data_model) + return selected_strategy.execute() diff --git a/watcher/decision_engine/strategy/strategies/__init__.py b/watcher/decision_engine/strategy/strategies/__init__.py index 741421201..76e5887b6 100644 --- a/watcher/decision_engine/strategy/strategies/__init__.py +++ b/watcher/decision_engine/strategy/strategies/__init__.py @@ -20,14 +20,16 @@ from watcher.decision_engine.strategy.strategies import dummy_strategy from watcher.decision_engine.strategy.strategies import outlet_temp_control from watcher.decision_engine.strategy.strategies import \ vm_workload_consolidation +from watcher.decision_engine.strategy.strategies import workload_balance from watcher.decision_engine.strategy.strategies import workload_stabilization BasicConsolidation = basic_consolidation.BasicConsolidation OutletTempControl = outlet_temp_control.OutletTempControl DummyStrategy = dummy_strategy.DummyStrategy VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation +WorkloadBalance = workload_balance.WorkloadBalance WorkloadStabilization = workload_stabilization.WorkloadStabilization -__all__ = ("BasicConsolidation", "OutletTempControl", - "DummyStrategy", "VMWorkloadConsolidation", +__all__ = ("BasicConsolidation", "OutletTempControl", "DummyStrategy", + "VMWorkloadConsolidation", "WorkloadBalance", "WorkloadStabilization") diff --git a/watcher/decision_engine/strategy/strategies/base.py b/watcher/decision_engine/strategy/strategies/base.py index 680e0e470..92863193c 100644 --- a/watcher/decision_engine/strategy/strategies/base.py +++ b/watcher/decision_engine/strategy/strategies/base.py @@ -39,12 +39,12 @@ which are dynamically loaded by Watcher at launch time. import abc import six -from watcher._i18n import _ from watcher.common import clients from watcher.common.loader import loadable from watcher.decision_engine.loading import default as loading from watcher.decision_engine.solution import default from watcher.decision_engine.strategy.common import level +from watcher.metrics_engine.cluster_model_collector import manager @six.add_metaclass(abc.ABCMeta) @@ -56,7 +56,13 @@ class BaseStrategy(loadable.Loadable): """ def __init__(self, config, osc=None): - """:param osc: an OpenStackClients instance""" + """Constructor: the signature should be identical within the subclasses + + :param config: Configuration related to this plugin + :type config: :py:class:`~.Struct` + :param osc: An OpenStackClients instance + :type osc: :py:class:`~.OpenStackClients` instance + """ super(BaseStrategy, self).__init__(config) self._name = self.get_name() self._display_name = self.get_display_name() @@ -66,6 +72,9 @@ class BaseStrategy(loadable.Loadable): # the solution given by the strategy self._solution = default.DefaultSolution() self._osc = osc + self._collector_manager = None + self._model = None + self._goal = None @classmethod @abc.abstractmethod @@ -109,14 +118,60 @@ class BaseStrategy(loadable.Loadable): return [] @abc.abstractmethod - def execute(self, original_model): + def pre_execute(self): + """Pre-execution phase + + This can be used to fetch some pre-requisites or data. + """ + raise NotImplementedError() + + @abc.abstractmethod + def do_execute(self): + """Strategy execution phase + + This phase is where you should put the main logic of your strategy. + """ + raise NotImplementedError() + + @abc.abstractmethod + def post_execute(self): + """Post-execution phase + + This can be used to compute the global efficacy + """ + raise NotImplementedError() + + def execute(self): """Execute a strategy - :param original_model: The model the strategy is executed on - :type model: str :return: A computed solution (via a placement algorithm) - :rtype: :class:`watcher.decision_engine.solution.base.BaseSolution` + :rtype: :py:class:`~.BaseSolution` instance """ + self.pre_execute() + self.do_execute() + self.post_execute() + + return self.solution + + @property + def collector(self): + if self._collector_manager is None: + self._collector_manager = manager.CollectorManager() + return self._collector_manager + + @property + def model(self): + """Cluster data model + + :returns: Cluster data model the strategy is executed on + :rtype model: :py:class:`~.ModelRoot` instance + """ + if self._model is None: + collector = self.collector.get_cluster_model_collector( + osc=self.osc) + self._model = collector.get_latest_cluster_data_model() + + return self._model @property def osc(self): @@ -140,6 +195,10 @@ class BaseStrategy(loadable.Loadable): def display_name(self): return self._display_name + @property + def goal(self): + return self._goal + @property def strategy_level(self): return self._strategy_level @@ -202,11 +261,3 @@ class WorkloadStabilizationBaseStrategy(BaseStrategy): @classmethod def get_goal_name(cls): return "workload_balancing" - - @classmethod - def get_goal_display_name(cls): - return _("Workload balancing") - - @classmethod - def get_translatable_goal_display_name(cls): - return "Workload balancing" diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index 8a60525ae..b88f20f9d 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -71,11 +71,11 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): MIGRATION = "migrate" CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state" - def __init__(self, config=None, osc=None): + def __init__(self, config, osc=None): """Basic offline Consolidation using live migration :param config: A mapping containing the configuration of this strategy - :type config: dict + :type config: :py:class:`~.Struct` instance :param osc: :py:class:`~.OpenStackClients` instance """ super(BasicConsolidation, self).__init__(config, osc) @@ -134,17 +134,14 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def compute_attempts(self, size_cluster): """Upper bound of the number of migration - :param size_cluster: + :param size_cluster: The size of the cluster """ self.migration_attempts = size_cluster * self.bound_migration - def check_migration(self, cluster_data_model, - src_hypervisor, - dest_hypervisor, - vm_to_mig): - """check if the migration is possible + def check_migration(self, src_hypervisor, dest_hypervisor, vm_to_mig): + """Check if the migration is possible - :param cluster_data_model: the current state of the cluster + :param self.model: the current state of the cluster :param src_hypervisor: the current node of the virtual machine :param dest_hypervisor: the destination of the virtual machine :param vm_to_mig: the virtual machine @@ -153,24 +150,22 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): if src_hypervisor == dest_hypervisor: return False - LOG.debug('Migrate VM {0} from {1} to {2} '.format(vm_to_mig, - src_hypervisor, - dest_hypervisor, - )) + LOG.debug('Migrate VM %s from %s to %s', + vm_to_mig, src_hypervisor, dest_hypervisor) total_cores = 0 total_disk = 0 total_mem = 0 - cpu_capacity = cluster_data_model.get_resource_from_id( + cpu_capacity = self.model.get_resource_from_id( resource.ResourceType.cpu_cores) - disk_capacity = cluster_data_model.get_resource_from_id( + disk_capacity = self.model.get_resource_from_id( resource.ResourceType.disk) - memory_capacity = cluster_data_model.get_resource_from_id( + memory_capacity = self.model.get_resource_from_id( resource.ResourceType.memory) - for vm_id in cluster_data_model. \ + for vm_id in self.model. \ get_mapping().get_node_vms(dest_hypervisor): - vm = cluster_data_model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) total_cores += cpu_capacity.get_capacity(vm) total_disk += disk_capacity.get_capacity(vm) total_mem += memory_capacity.get_capacity(vm) @@ -180,42 +175,33 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): total_disk += disk_capacity.get_capacity(vm_to_mig) total_mem += memory_capacity.get_capacity(vm_to_mig) - return self.check_threshold(cluster_data_model, - dest_hypervisor, - total_cores, - total_disk, + return self.check_threshold(dest_hypervisor, total_cores, total_disk, total_mem) - def check_threshold(self, cluster_data_model, - dest_hypervisor, - total_cores, - total_disk, - total_mem): + def check_threshold(self, dest_hypervisor, total_cores, + total_disk, total_mem): """Check threshold check the threshold value defined by the ratio of aggregated CPU capacity of VMs on one node to CPU capacity of this node must not exceed the threshold value. - :param cluster_data_model: the current state of the cluster + :param self.model: the current state of the cluster :param dest_hypervisor: the destination of the virtual machine :param total_cores :param total_disk :param total_mem :return: True if the threshold is not exceed """ - cpu_capacity = cluster_data_model.get_resource_from_id( + cpu_capacity = self.model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor) - disk_capacity = cluster_data_model.get_resource_from_id( + disk_capacity = self.model.get_resource_from_id( resource.ResourceType.disk).get_capacity(dest_hypervisor) - memory_capacity = cluster_data_model.get_resource_from_id( + memory_capacity = self.model.get_resource_from_id( resource.ResourceType.memory).get_capacity(dest_hypervisor) - if (cpu_capacity >= total_cores * self.threshold_cores and + return (cpu_capacity >= total_cores * self.threshold_cores and disk_capacity >= total_disk * self.threshold_disk and - memory_capacity >= total_mem * self.threshold_mem): - return True - else: - return False + memory_capacity >= total_mem * self.threshold_mem) def get_allowed_migration_attempts(self): """Allowed migration @@ -226,37 +212,24 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): """ return self.migration_attempts - def get_threshold_cores(self): - return self.threshold_cores - - def set_threshold_cores(self, threshold): - self.threshold_cores = threshold - - def get_number_of_released_nodes(self): - return self.number_of_released_nodes - - def get_number_of_migrations(self): - return self.number_of_migrations - - def calculate_weight(self, cluster_data_model, element, - total_cores_used, total_disk_used, + def calculate_weight(self, element, total_cores_used, total_disk_used, total_memory_used): """Calculate weight of every resource - :param cluster_data_model: + :param self.model: :param element: :param total_cores_used: :param total_disk_used: :param total_memory_used: :return: """ - cpu_capacity = cluster_data_model.get_resource_from_id( + cpu_capacity = self.model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(element) - disk_capacity = cluster_data_model.get_resource_from_id( + disk_capacity = self.model.get_resource_from_id( resource.ResourceType.disk).get_capacity(element) - memory_capacity = cluster_data_model.get_resource_from_id( + memory_capacity = self.model.get_resource_from_id( resource.ResourceType.memory).get_capacity(element) score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) / @@ -275,20 +248,19 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): # todo(jed) take in account weight return (score_cores + score_disk + score_memory) / 3 - def calculate_score_node(self, hypervisor, model): - """calculate the score that represent the utilization level + def calculate_score_node(self, hypervisor): + """Calculate the score that represent the utilization level - :param hypervisor: - :param model: - :return: - """ + :param hypervisor: + :return: + """ resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname) host_avg_cpu_util = self.ceilometer. \ statistic_aggregation(resource_id=resource_id, meter_name=self.HOST_CPU_USAGE_METRIC_NAME, period="7200", - aggregate='avg' - ) + aggregate='avg') + if host_avg_cpu_util is None: LOG.error( _LE("No values returned by %(resource_id)s " @@ -298,14 +270,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): ) host_avg_cpu_util = 100 - cpu_capacity = model.get_resource_from_id( + cpu_capacity = self.model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(hypervisor) total_cores_used = cpu_capacity * (host_avg_cpu_util / 100) - return self.calculate_weight(model, hypervisor, total_cores_used, - 0, - 0) + return self.calculate_weight(hypervisor, total_cores_used, 0, 0) def calculate_migration_efficacy(self): """Calculate migration efficacy @@ -319,16 +289,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): else: return 0 - def calculate_score_vm(self, vm, cluster_data_model): + def calculate_score_vm(self, vm): """Calculate Score of virtual machine :param vm: the virtual machine - :param cluster_data_model: the cluster model + :param self.model: the cluster model :return: score """ - if cluster_data_model is None: - raise exception.ClusterStateNotDefined() - vm_cpu_utilization = self.ceilometer. \ statistic_aggregation( resource_id=vm.uuid, @@ -345,13 +312,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): ) vm_cpu_utilization = 100 - cpu_capacity = cluster_data_model.get_resource_from_id( + cpu_capacity = self.model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(vm) total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0) - return self.calculate_weight(cluster_data_model, vm, - total_cores_used, 0, 0) + return self.calculate_weight(vm, total_cores_used, 0, 0) def add_change_service_state(self, resource_id, state): parameters = {'state': state} @@ -371,48 +337,47 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): resource_id=resource_id, input_parameters=parameters) - def score_of_nodes(self, cluster_data_model, score): + def score_of_nodes(self, score): """Calculate score of nodes based on load by VMs""" - for hypervisor_id in cluster_data_model.get_all_hypervisors(): - hypervisor = cluster_data_model. \ + for hypervisor_id in self.model.get_all_hypervisors(): + hypervisor = self.model. \ get_hypervisor_from_id(hypervisor_id) - count = cluster_data_model.get_mapping(). \ + count = self.model.get_mapping(). \ get_node_vms_from_id(hypervisor_id) if len(count) > 0: - result = self.calculate_score_node(hypervisor, - cluster_data_model) + result = self.calculate_score_node(hypervisor) else: - ''' the hypervisor has not VMs ''' + # The hypervisor has not VMs result = 0 if len(count) > 0: score.append((hypervisor_id, result)) return score - def node_and_vm_score(self, sorted_score, score, current_model): + def node_and_vm_score(self, sorted_score, score): """Get List of VMs from Node""" node_to_release = sorted_score[len(score) - 1][0] - vms_to_mig = current_model.get_mapping().get_node_vms_from_id( + vms_to_mig = self.model.get_mapping().get_node_vms_from_id( node_to_release) vm_score = [] for vm_id in vms_to_mig: - vm = current_model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) if vm.state == vm_state.VMState.ACTIVE.value: vm_score.append( - (vm_id, self.calculate_score_vm(vm, current_model))) + (vm_id, self.calculate_score_vm(vm))) return node_to_release, vm_score - def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor, + def create_migration_vm(self, mig_vm, mig_src_hypervisor, mig_dst_hypervisor): - """Create migration VM """ - if current_model.get_mapping().migrate_vm( + """Create migration VM""" + if self.model.get_mapping().migrate_vm( mig_vm, mig_src_hypervisor, mig_dst_hypervisor): self.add_migration(mig_vm.uuid, 'live', mig_src_hypervisor.uuid, mig_dst_hypervisor.uuid) - if len(current_model.get_mapping().get_node_vms( + if len(self.model.get_mapping().get_node_vms( mig_src_hypervisor)) == 0: self.add_change_service_state(mig_src_hypervisor. uuid, @@ -420,24 +385,22 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): DISABLED.value) self.number_of_released_nodes += 1 - def calculate_num_migrations(self, sorted_vms, current_model, - node_to_release, sorted_score): + def calculate_num_migrations(self, sorted_vms, node_to_release, + sorted_score): number_migrations = 0 for vm in sorted_vms: for j in range(0, len(sorted_score)): - mig_vm = current_model.get_vm_from_id(vm[0]) - mig_src_hypervisor = current_model.get_hypervisor_from_id( + mig_vm = self.model.get_vm_from_id(vm[0]) + mig_src_hypervisor = self.model.get_hypervisor_from_id( node_to_release) - mig_dst_hypervisor = current_model.get_hypervisor_from_id( + mig_dst_hypervisor = self.model.get_hypervisor_from_id( sorted_score[j][0]) - result = self.check_migration(current_model, - mig_src_hypervisor, - mig_dst_hypervisor, mig_vm) + result = self.check_migration( + mig_src_hypervisor, mig_dst_hypervisor, mig_vm) if result: self.create_migration_vm( - current_model, mig_vm, - mig_src_hypervisor, mig_dst_hypervisor) + mig_vm, mig_src_hypervisor, mig_dst_hypervisor) number_migrations += 1 break return number_migrations @@ -450,28 +413,26 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): else: return unsuccessful_migration + 1 - def execute(self, original_model): + def pre_execute(self): LOG.info(_LI("Initializing Sercon Consolidation")) - - if original_model is None: + if self.model is None: raise exception.ClusterStateNotDefined() + def do_execute(self): # todo(jed) clone model - current_model = original_model - self.efficacy = 100 unsuccessful_migration = 0 first_migration = True - size_cluster = len(current_model.get_all_hypervisors()) + size_cluster = len(self.model.get_all_hypervisors()) if size_cluster == 0: raise exception.ClusterEmpty() self.compute_attempts(size_cluster) - for hypervisor_id in current_model.get_all_hypervisors(): - hypervisor = current_model.get_hypervisor_from_id(hypervisor_id) - count = current_model.get_mapping(). \ + for hypervisor_id in self.model.get_all_hypervisors(): + hypervisor = self.model.get_hypervisor_from_id(hypervisor_id) + count = self.model.get_mapping(). \ get_node_vms_from_id(hypervisor_id) if len(count) == 0: if hypervisor.state == hyper_state.HypervisorState.ENABLED: @@ -487,13 +448,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): first_migration = False score = [] - score = self.score_of_nodes(current_model, score) + score = self.score_of_nodes(score) - ''' sort compute nodes by Score decreasing ''''' + # Sort compute nodes by Score decreasing sorted_score = sorted(score, reverse=True, key=lambda x: (x[1])) - LOG.debug("Hypervisor(s) BFD {0}".format(sorted_score)) + LOG.debug("Hypervisor(s) BFD %s", sorted_score) - ''' get Node to be released ''' + # Get Node to be released if len(score) == 0: LOG.warning(_LW( "The workloads of the compute nodes" @@ -501,15 +462,15 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): break node_to_release, vm_score = self.node_and_vm_score( - sorted_score, score, current_model) + sorted_score, score) - ''' sort VMs by Score ''' + # Sort VMs by Score sorted_vms = sorted(vm_score, reverse=True, key=lambda x: (x[1])) # BFD: Best Fit Decrease - LOG.debug("VM(s) BFD {0}".format(sorted_vms)) + LOG.debug("VM(s) BFD %s", sorted_vms) migrations = self.calculate_num_migrations( - sorted_vms, current_model, node_to_release, sorted_score) + sorted_vms, node_to_release, sorted_score) unsuccessful_migration = self.unsuccessful_migration_actualization( migrations, unsuccessful_migration) @@ -519,6 +480,6 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): "efficacy": self.efficacy } LOG.debug(infos) - self.solution.model = current_model - self.solution.efficacy = self.efficacy - return self.solution + + def post_execute(self): + pass diff --git a/watcher/decision_engine/strategy/strategies/dummy_strategy.py b/watcher/decision_engine/strategy/strategies/dummy_strategy.py index 64b7c1caa..63d265e57 100644 --- a/watcher/decision_engine/strategy/strategies/dummy_strategy.py +++ b/watcher/decision_engine/strategy/strategies/dummy_strategy.py @@ -19,6 +19,7 @@ from oslo_log import log from watcher._i18n import _ +from watcher.common import exception from watcher.decision_engine.strategy.strategies import base LOG = log.getLogger(__name__) @@ -48,16 +49,11 @@ class DummyStrategy(base.DummyBaseStrategy): NOP = "nop" SLEEP = "sleep" - def __init__(self, config=None, osc=None): - """Dummy Strategy implemented for demo and testing purposes + def pre_execute(self): + if self.model is None: + raise exception.ClusterStateNotDefined() - :param config: A mapping containing the configuration of this strategy - :type config: dict - :param osc: :py:class:`~.OpenStackClients` instance - """ - super(DummyStrategy, self).__init__(config, osc) - - def execute(self, original_model): + def do_execute(self): LOG.debug("Executing Dummy strategy") parameters = {'message': 'hello World'} self.solution.add_action(action_type=self.NOP, @@ -69,7 +65,9 @@ class DummyStrategy(base.DummyBaseStrategy): self.solution.add_action(action_type=self.SLEEP, input_parameters={'duration': 5.0}) - return self.solution + + def post_execute(self): + pass @classmethod def get_name(cls): diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index 15d914cb4..0a72c9682 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -30,7 +30,7 @@ telemetries to measure thermal/workload status of server. from oslo_log import log -from watcher._i18n import _, _LE +from watcher._i18n import _, _LE, _LI from watcher.common import exception as wexc from watcher.decision_engine.model import resource from watcher.decision_engine.model import vm_state @@ -78,7 +78,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): MIGRATION = "migrate" - def __init__(self, config=None, osc=None): + def __init__(self, config, osc=None): """Outlet temperature control using live migration :param config: A mapping containing the configuration of this strategy @@ -116,26 +116,25 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def ceilometer(self, c): self._ceilometer = c - def calc_used_res(self, cluster_data_model, hypervisor, cpu_capacity, + def calc_used_res(self, hypervisor, cpu_capacity, memory_capacity, disk_capacity): - '''calculate the used vcpus, memory and disk based on VM flavors''' - vms = cluster_data_model.get_mapping().get_node_vms(hypervisor) + """Calculate the used vcpus, memory and disk based on VM flavors""" + vms = self.model.get_mapping().get_node_vms(hypervisor) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 if len(vms) > 0: for vm_id in vms: - vm = cluster_data_model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) vcpus_used += cpu_capacity.get_capacity(vm) memory_mb_used += memory_capacity.get_capacity(vm) disk_gb_used += disk_capacity.get_capacity(vm) return vcpus_used, memory_mb_used, disk_gb_used - def group_hosts_by_outlet_temp(self, cluster_data_model): + def group_hosts_by_outlet_temp(self): """Group hosts based on outlet temp meters""" - - hypervisors = cluster_data_model.get_all_hypervisors() + hypervisors = self.model.get_all_hypervisors() size_cluster = len(hypervisors) if size_cluster == 0: raise wexc.ClusterEmpty() @@ -143,7 +142,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): hosts_need_release = [] hosts_target = [] for hypervisor_id in hypervisors: - hypervisor = cluster_data_model.get_hypervisor_from_id( + hypervisor = self.model.get_hypervisor_from_id( hypervisor_id) resource_id = hypervisor.uuid @@ -166,37 +165,35 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): hosts_target.append(hvmap) return hosts_need_release, hosts_target - def choose_vm_to_migrate(self, cluster_data_model, hosts): - """pick up an active vm instance to migrate from provided hosts""" - + def choose_vm_to_migrate(self, hosts): + """Pick up an active vm instance to migrate from provided hosts""" for hvmap in hosts: mig_src_hypervisor = hvmap['hv'] - vms_of_src = cluster_data_model.get_mapping().get_node_vms( + vms_of_src = self.model.get_mapping().get_node_vms( mig_src_hypervisor) if len(vms_of_src) > 0: for vm_id in vms_of_src: try: # select the first active VM to migrate - vm = cluster_data_model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) if vm.state != vm_state.VMState.ACTIVE.value: LOG.info(_LE("VM not active, skipped: %s"), vm.uuid) continue return mig_src_hypervisor, vm except wexc.InstanceNotFound as e: - LOG.info("VM not found Error: %s" % e.message) - pass + LOG.exception(e) + LOG.info(_LI("VM not found")) return None - def filter_dest_servers(self, cluster_data_model, hosts, vm_to_migrate): + def filter_dest_servers(self, hosts, vm_to_migrate): """Only return hosts with sufficient available resources""" - - cpu_capacity = cluster_data_model.get_resource_from_id( + cpu_capacity = self.model.get_resource_from_id( resource.ResourceType.cpu_cores) - disk_capacity = cluster_data_model.get_resource_from_id( + disk_capacity = self.model.get_resource_from_id( resource.ResourceType.disk) - memory_capacity = cluster_data_model.get_resource_from_id( + memory_capacity = self.model.get_resource_from_id( resource.ResourceType.memory) required_cores = cpu_capacity.get_capacity(vm_to_migrate) @@ -209,8 +206,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): host = hvmap['hv'] # available cores_used, mem_used, disk_used = self.calc_used_res( - cluster_data_model, host, cpu_capacity, memory_capacity, - disk_capacity) + host, cpu_capacity, memory_capacity, disk_capacity) cores_available = cpu_capacity.get_capacity(host) - cores_used disk_available = disk_capacity.get_capacity(host) - disk_used mem_available = memory_capacity.get_capacity(host) - mem_used @@ -221,15 +217,14 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): return dest_servers - def execute(self, original_model): + def pre_execute(self): LOG.debug("Initializing Outlet temperature strategy") - if original_model is None: + if self.model is None: raise wexc.ClusterStateNotDefined() - current_model = original_model - hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp( - current_model) + def do_execute(self): + hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp() if len(hosts_need_release) == 0: # TODO(zhenzanz): return something right if there's no hot servers @@ -245,16 +240,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): reverse=True, key=lambda x: (x["outlet_temp"])) - vm_to_migrate = self.choose_vm_to_migrate(current_model, - hosts_need_release) + vm_to_migrate = self.choose_vm_to_migrate(hosts_need_release) # calculate the vm's cpu cores,memory,disk needs if vm_to_migrate is None: return self.solution mig_src_hypervisor, vm_src = vm_to_migrate - dest_servers = self.filter_dest_servers(current_model, - hosts_target, - vm_src) + dest_servers = self.filter_dest_servers(hosts_target, vm_src) # sort the filtered result by outlet temp # pick up the lowest one as dest server if len(dest_servers) == 0: @@ -267,9 +259,8 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): # always use the host with lowerest outlet temperature mig_dst_hypervisor = dest_servers[0]['hv'] # generate solution to migrate the vm to the dest server, - if current_model.get_mapping().migrate_vm(vm_src, - mig_src_hypervisor, - mig_dst_hypervisor): + if self.model.get_mapping().migrate_vm( + vm_src, mig_src_hypervisor, mig_dst_hypervisor): parameters = {'migration_type': 'live', 'src_hypervisor': mig_src_hypervisor.uuid, 'dst_hypervisor': mig_dst_hypervisor.uuid} @@ -277,6 +268,6 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): resource_id=vm_src.uuid, input_parameters=parameters) - self.solution.model = current_model - - return self.solution + def post_execute(self): + self.solution.model = self.model + # TODO(v-francoise): Add the indicators to the solution diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py index e8c6969bb..ca4ae6d58 100644 --- a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -84,7 +84,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): https://github.com/openstack/watcher-specs/blob/master/specs/mitaka/implemented/zhaw-load-consolidation.rst """ # noqa - def __init__(self, config=None, osc=None): + def __init__(self, config, osc=None): super(VMWorkloadConsolidation, self).__init__(config, osc) self._ceilometer = None self.number_of_migrations = 0 @@ -121,8 +121,8 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): """ if isinstance(state, six.string_types): return state - elif (type(state) == hyper_state.HypervisorState or - type(state) == vm_state.VMState): + elif isinstance(state, (vm_state.VMState, + hyper_state.HypervisorState)): return state.value else: LOG.error(_LE('Unexpexted resource state type, ' @@ -171,12 +171,10 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): vm_state_str = self.get_state_str(vm.state) if vm_state_str != vm_state.VMState.ACTIVE.value: - ''' - Watcher curently only supports live VM migration and block live - VM migration which both requires migrated VM to be active. - When supported, the cold migration may be used as a fallback - migration mechanism to move non active VMs. - ''' + # Watcher curently only supports live VM migration and block live + # VM migration which both requires migrated VM to be active. + # When supported, the cold migration may be used as a fallback + # migration mechanism to move non active VMs. LOG.error(_LE('Cannot live migrate: vm_uuid=%(vm_uuid)s, ' 'state=%(vm_state)s.'), vm_uuid=vm_uuid, @@ -209,13 +207,13 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): if len(model.get_mapping().get_node_vms(hypervisor)) == 0: self.add_action_deactivate_hypervisor(hypervisor) - def get_prediction_model(self, model): + def get_prediction_model(self): """Return a deepcopy of a model representing current cluster state. :param model: model_root object :return: model_root object """ - return deepcopy(model) + return deepcopy(self.model) def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'): """Collect cpu, ram and disk utilization statistics of a VM. @@ -334,7 +332,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): :param model: model_root object :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>} """ - hypervisors = model.get_all_hypervisors().values() + hypervisors = self.model.get_all_hypervisors().values() rcu = {} counters = {} for hypervisor in hypervisors: @@ -452,9 +450,11 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): key=lambda x: self.get_hypervisor_utilization(x, model)['cpu']) for hypervisor in reversed(sorted_hypervisors): if self.is_overloaded(hypervisor, model, cc): - for vm in sorted(model.get_mapping().get_node_vms(hypervisor), - key=lambda x: self.get_vm_utilization( - x, model)['cpu']): + for vm in sorted( + model.get_mapping().get_node_vms(hypervisor), + key=lambda x: self.get_vm_utilization( + x, model)['cpu'] + ): for dst_hypervisor in reversed(sorted_hypervisors): if self.vm_fits(vm, dst_hypervisor, model, cc): self.add_migration(vm, hypervisor, @@ -498,7 +498,11 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): dsc -= 1 asc += 1 - def execute(self, original_model): + def pre_execute(self): + if self.model is None: + raise exception.ClusterStateNotDefined() + + def do_execute(self): """Execute strategy. This strategy produces a solution resulting in more @@ -513,7 +517,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): :param original_model: root_model object """ LOG.info(_LI('Executing Smart Strategy')) - model = self.get_prediction_model(original_model) + model = self.get_prediction_model(self.model) rcu = self.get_relative_cluster_utilization(model) self.ceilometer_vm_data_cache = dict() @@ -545,4 +549,6 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): self.solution.model = model self.solution.efficacy = rcu_after['cpu'] - return self.solution + def post_execute(self): + # TODO(v-francoise): Add the indicators to the solution + pass diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index efaab1c86..507047e5b 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -28,7 +28,7 @@ from watcher.metrics_engine.cluster_history import ceilometer as ceil LOG = log.getLogger(__name__) -class WorkloadBalance(base.BaseStrategy): +class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): """[PoC]Workload balance using live migration *Description* @@ -68,10 +68,12 @@ class WorkloadBalance(base.BaseStrategy): MIGRATION = "migrate" - def __init__(self, config=None, osc=None): - """Using live migration + def __init__(self, config, osc=None): + """Workload balance using live migration - :param osc: an OpenStackClients object + :param config: A mapping containing the configuration of this strategy + :type config: :py:class:`~.Struct` instance + :param osc: :py:class:`~.OpenStackClients` instance """ super(WorkloadBalance, self).__init__(config, osc) # the migration plan will be triggered when the CPU utlization % @@ -104,44 +106,32 @@ class WorkloadBalance(base.BaseStrategy): def get_translatable_display_name(cls): return "workload balance migration strategy" - @classmethod - def get_goal_name(cls): - return "WORKLOAD_OPTIMIZATION" - - @classmethod - def get_goal_display_name(cls): - return _("Workload optimization") - - @classmethod - def get_translatable_goal_display_name(cls): - return "Workload optimization" - - def calculate_used_resource(self, model, hypervisor, cap_cores, cap_mem, + def calculate_used_resource(self, hypervisor, cap_cores, cap_mem, cap_disk): - '''calculate the used vcpus, memory and disk based on VM flavors''' - vms = model.get_mapping().get_node_vms(hypervisor) + """Calculate the used vcpus, memory and disk based on VM flavors""" + vms = self.model.get_mapping().get_node_vms(hypervisor) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 for vm_id in vms: - vm = model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) vcpus_used += cap_cores.get_capacity(vm) memory_mb_used += cap_mem.get_capacity(vm) disk_gb_used += cap_disk.get_capacity(vm) return vcpus_used, memory_mb_used, disk_gb_used - def choose_vm_to_migrate(self, model, hosts, avg_workload, workload_cache): - """pick up an active vm instance to migrate from provided hosts + def choose_vm_to_migrate(self, hosts, avg_workload, workload_cache): + """Pick up an active vm instance to migrate from provided hosts - :param model: it's the origin_model passed from 'execute' function :param hosts: the array of dict which contains hypervisor object :param avg_workload: the average workload value of all hypervisors :param workload_cache: the map contains vm to workload mapping """ for hvmap in hosts: source_hypervisor = hvmap['hv'] - source_vms = model.get_mapping().get_node_vms(source_hypervisor) + source_vms = self.model.get_mapping().get_node_vms( + source_hypervisor) if source_vms: delta_workload = hvmap['workload'] - avg_workload min_delta = 1000000 @@ -149,7 +139,7 @@ class WorkloadBalance(base.BaseStrategy): for vm_id in source_vms: try: # select the first active VM to migrate - vm = model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) if vm.state != vm_state.VMState.ACTIVE.value: LOG.debug("VM not active, skipped: %s", vm.uuid) @@ -161,18 +151,20 @@ class WorkloadBalance(base.BaseStrategy): except wexc.InstanceNotFound: LOG.error(_LE("VM not found Error: %s"), vm_id) if instance_id: - return source_hypervisor, model.get_vm_from_id(instance_id) + return source_hypervisor, self.model.get_vm_from_id( + instance_id) else: LOG.info(_LI("VM not found from hypervisor: %s"), source_hypervisor.uuid) - def filter_destination_hosts(self, model, hosts, vm_to_migrate, + def filter_destination_hosts(self, hosts, vm_to_migrate, avg_workload, workload_cache): '''Only return hosts with sufficient available resources''' - cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) - cap_disk = model.get_resource_from_id(resource.ResourceType.disk) - cap_mem = model.get_resource_from_id(resource.ResourceType.memory) + cap_cores = self.model.get_resource_from_id( + resource.ResourceType.cpu_cores) + cap_disk = self.model.get_resource_from_id(resource.ResourceType.disk) + cap_mem = self.model.get_resource_from_id(resource.ResourceType.memory) required_cores = cap_cores.get_capacity(vm_to_migrate) required_disk = cap_disk.get_capacity(vm_to_migrate) @@ -186,20 +178,22 @@ class WorkloadBalance(base.BaseStrategy): workload = hvmap['workload'] # calculate the available resources cores_used, mem_used, disk_used = self.calculate_used_resource( - model, host, cap_cores, cap_mem, cap_disk) + host, cap_cores, cap_mem, cap_disk) cores_available = cap_cores.get_capacity(host) - cores_used disk_available = cap_disk.get_capacity(host) - disk_used mem_available = cap_mem.get_capacity(host) - mem_used - if (cores_available >= required_cores and - disk_available >= required_disk and - mem_available >= required_mem and - (src_vm_workload + workload) < self.threshold / 100 * - cap_cores.get_capacity(host)): + if ( + cores_available >= required_cores and + disk_available >= required_disk and + mem_available >= required_mem and + (src_vm_workload + workload) < self.threshold / 100 * + cap_cores.get_capacity(host) + ): destination_hosts.append(hvmap) return destination_hosts - def group_hosts_by_cpu_util(self, model): + def group_hosts_by_cpu_util(self): """Calculate the workloads of each hypervisor try to find out the hypervisors which have reached threshold @@ -208,12 +202,13 @@ class WorkloadBalance(base.BaseStrategy): and also generate the VM workload map. """ - hypervisors = model.get_all_hypervisors() + hypervisors = self.model.get_all_hypervisors() cluster_size = len(hypervisors) if not hypervisors: raise wexc.ClusterEmpty() # get cpu cores capacity of hypervisors and vms - cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) + cap_cores = self.model.get_resource_from_id( + resource.ResourceType.cpu_cores) overload_hosts = [] nonoverload_hosts = [] # total workload of cluster @@ -222,19 +217,20 @@ class WorkloadBalance(base.BaseStrategy): # use workload_cache to store the workload of VMs for reuse purpose workload_cache = {} for hypervisor_id in hypervisors: - hypervisor = model.get_hypervisor_from_id(hypervisor_id) - vms = model.get_mapping().get_node_vms(hypervisor) + hypervisor = self.model.get_hypervisor_from_id(hypervisor_id) + vms = self.model.get_mapping().get_node_vms(hypervisor) hypervisor_workload = 0.0 for vm_id in vms: - vm = model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) try: cpu_util = self.ceilometer.statistic_aggregation( resource_id=vm_id, meter_name=self._meter, period=self._period, aggregate='avg') - except Exception as e: - LOG.error(_LE("Can not get cpu_util: %s"), e.message) + except Exception as exc: + LOG.exception(exc) + LOG.error(_LE("Can not get cpu_util")) continue if cpu_util is None: LOG.debug("%s: cpu_util is None", vm_id) @@ -260,15 +256,23 @@ class WorkloadBalance(base.BaseStrategy): return overload_hosts, nonoverload_hosts, avg_workload, workload_cache - def execute(self, origin_model): + def pre_execute(self): + """Pre-execution phase + + This can be used to fetch some pre-requisites or data. + """ LOG.info(_LI("Initializing Workload Balance Strategy")) - if origin_model is None: + if self.model is None: raise wexc.ClusterStateNotDefined() - current_model = origin_model + def do_execute(self): + """Strategy execution phase + + This phase is where you should put the main logic of your strategy. + """ src_hypervisors, target_hypervisors, avg_workload, workload_cache = ( - self.group_hosts_by_cpu_util(current_model)) + self.group_hosts_by_cpu_util()) if not src_hypervisors: LOG.debug("No hosts require optimization") @@ -286,19 +290,14 @@ class WorkloadBalance(base.BaseStrategy): reverse=True, key=lambda x: (x[self.METER_NAME])) - vm_to_migrate = self.choose_vm_to_migrate(current_model, - src_hypervisors, - avg_workload, - workload_cache) + vm_to_migrate = self.choose_vm_to_migrate( + src_hypervisors, avg_workload, workload_cache) if not vm_to_migrate: return self.solution source_hypervisor, vm_src = vm_to_migrate # find the hosts that have enough resource for the VM to be migrated - destination_hosts = self.filter_destination_hosts(current_model, - target_hypervisors, - vm_src, - avg_workload, - workload_cache) + destination_hosts = self.filter_destination_hosts( + target_hypervisors, vm_src, avg_workload, workload_cache) # sort the filtered result by workload # pick up the lowest one as dest server if not destination_hosts: @@ -311,14 +310,18 @@ class WorkloadBalance(base.BaseStrategy): # always use the host with lowerest CPU utilization mig_dst_hypervisor = destination_hosts[0]['hv'] # generate solution to migrate the vm to the dest server, - if current_model.get_mapping().migrate_vm(vm_src, - source_hypervisor, - mig_dst_hypervisor): + if self.model.get_mapping().migrate_vm(vm_src, source_hypervisor, + mig_dst_hypervisor): parameters = {'migration_type': 'live', 'src_hypervisor': source_hypervisor.uuid, 'dst_hypervisor': mig_dst_hypervisor.uuid} self.solution.add_action(action_type=self.MIGRATION, resource_id=vm_src.uuid, input_parameters=parameters) - self.solution.model = current_model - return self.solution + + def post_execute(self): + """Post-execution phase + + This can be used to compute the global efficacy + """ + self.solution.model = self.model diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index b10fe8385..b7a2a16e6 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -108,7 +108,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): MIGRATION = "migrate" MEMOIZE = _set_memoize(CONF) - def __init__(self, config=None, osc=None): + def __init__(self, config, osc=None): super(WorkloadStabilization, self).__init__(config, osc) self._ceilometer = None self._nova = None @@ -164,17 +164,16 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): return vm_load['cpu_util'] * (vm_load['vcpus'] / float(host_vcpus)) @MEMOIZE - def get_vm_load(self, vm_uuid, current_model): + def get_vm_load(self, vm_uuid): """Gathering vm load through ceilometer statistic. :param vm_uuid: vm for which statistic is gathered. - :param current_model: the cluster model :return: dict """ LOG.debug(_LI('get_vm_load started')) - vm_vcpus = current_model.get_resource_from_id( + vm_vcpus = self.model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity( - current_model.get_vm_from_id(vm_uuid)) + self.model.get_vm_from_id(vm_uuid)) vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus} for meter in self.metrics: avg_meter = self.ceilometer.statistic_aggregation( @@ -189,25 +188,25 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): vm_load[meter] = avg_meter return vm_load - def normalize_hosts_load(self, hosts, current_model): + def normalize_hosts_load(self, hosts): normalized_hosts = deepcopy(hosts) for host in normalized_hosts: if 'memory.resident' in normalized_hosts[host]: - h_memory = current_model.get_resource_from_id( + h_memory = self.model.get_resource_from_id( resource.ResourceType.memory).get_capacity( - current_model.get_hypervisor_from_id(host)) + self.model.get_hypervisor_from_id(host)) normalized_hosts[host]['memory.resident'] /= float(h_memory) return normalized_hosts - def get_hosts_load(self, current_model): + def get_hosts_load(self): """Get load of every host by gathering vms load""" hosts_load = {} - for hypervisor_id in current_model.get_all_hypervisors(): + for hypervisor_id in self.model.get_all_hypervisors(): hosts_load[hypervisor_id] = {} - host_vcpus = current_model.get_resource_from_id( + host_vcpus = self.model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity( - current_model.get_hypervisor_from_id(hypervisor_id)) + self.model.get_hypervisor_from_id(hypervisor_id)) hosts_load[hypervisor_id]['vcpus'] = host_vcpus for metric in self.metrics: @@ -250,8 +249,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): " for %s in weight dict.") % metric) return weighted_sd - def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id, - current_model): + def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id): """Calculate migration case Return list of standard deviation values, that appearing in case of @@ -260,12 +258,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): :param vm_id: the virtual machine :param src_hp_id: the source hypervisor id :param dst_hp_id: the destination hypervisor id - :param current_model: the cluster model :return: list of standard deviation values """ migration_case = [] new_hosts = deepcopy(hosts) - vm_load = self.get_vm_load(vm_id, current_model) + vm_load = self.get_vm_load(vm_id) d_host_vcpus = new_hosts[dst_hp_id]['vcpus'] s_host_vcpus = new_hosts[src_hp_id]['vcpus'] for metric in self.metrics: @@ -279,13 +276,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): else: new_hosts[src_hp_id][metric] -= vm_load[metric] new_hosts[dst_hp_id][metric] += vm_load[metric] - normalized_hosts = self.normalize_hosts_load(new_hosts, current_model) + normalized_hosts = self.normalize_hosts_load(new_hosts) for metric in self.metrics: migration_case.append(self.get_sd(normalized_hosts, metric)) migration_case.append(new_hosts) return migration_case - def simulate_migrations(self, current_model, hosts): + def simulate_migrations(self, hosts): """Make sorted list of pairs vm:dst_host""" def yield_hypervisors(hypervisors): ct = CONF['watcher_strategies.workload_stabilization'].retry_count @@ -300,23 +297,22 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): yield hypervisors vm_host_map = [] - for source_hp_id in current_model.get_all_hypervisors(): - hypervisors = list(current_model.get_all_hypervisors()) + for source_hp_id in self.model.get_all_hypervisors(): + hypervisors = list(self.model.get_all_hypervisors()) hypervisors.remove(source_hp_id) hypervisor_list = yield_hypervisors(hypervisors) - vms_id = current_model.get_mapping(). \ + vms_id = self.model.get_mapping(). \ get_node_vms_from_id(source_hp_id) for vm_id in vms_id: min_sd_case = {'value': len(self.metrics)} - vm = current_model.get_vm_from_id(vm_id) + vm = self.model.get_vm_from_id(vm_id) if vm.state not in [vm_state.VMState.ACTIVE.value, vm_state.VMState.PAUSED.value]: continue for dst_hp_id in next(hypervisor_list): sd_case = self.calculate_migration_case(hosts, vm_id, source_hp_id, - dst_hp_id, - current_model) + dst_hp_id) weighted_sd = self.calculate_weighted_sd(sd_case[:-1]) @@ -327,14 +323,14 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): break return sorted(vm_host_map, key=lambda x: x['value']) - def check_threshold(self, current_model): + def check_threshold(self): """Check if cluster is needed in balancing""" - hosts_load = self.get_hosts_load(current_model) - normalized_load = self.normalize_hosts_load(hosts_load, current_model) + hosts_load = self.get_hosts_load() + normalized_load = self.normalize_hosts_load(hosts_load) for metric in self.metrics: metric_sd = self.get_sd(normalized_load, metric) if metric_sd > float(self.thresholds[metric]): - return self.simulate_migrations(current_model, hosts_load) + return self.simulate_migrations(hosts_load) def add_migration(self, resource_id, @@ -348,58 +344,57 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): resource_id=resource_id, input_parameters=parameters) - def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor, + def create_migration_vm(self, mig_vm, mig_src_hypervisor, mig_dst_hypervisor): """Create migration VM """ - if current_model.get_mapping().migrate_vm( + if self.model.get_mapping().migrate_vm( mig_vm, mig_src_hypervisor, mig_dst_hypervisor): self.add_migration(mig_vm.uuid, 'live', mig_src_hypervisor.uuid, mig_dst_hypervisor.uuid) - def migrate(self, current_model, vm_uuid, src_host, dst_host): - mig_vm = current_model.get_vm_from_id(vm_uuid) - mig_src_hypervisor = current_model.get_hypervisor_from_id(src_host) - mig_dst_hypervisor = current_model.get_hypervisor_from_id(dst_host) - self.create_migration_vm(current_model, mig_vm, mig_src_hypervisor, + def migrate(self, vm_uuid, src_host, dst_host): + mig_vm = self.model.get_vm_from_id(vm_uuid) + mig_src_hypervisor = self.model.get_hypervisor_from_id(src_host) + mig_dst_hypervisor = self.model.get_hypervisor_from_id(dst_host) + self.create_migration_vm(mig_vm, mig_src_hypervisor, mig_dst_hypervisor) - def fill_solution(self, current_model): - self.solution.model = current_model + def fill_solution(self): + self.solution.model = self.model self.solution.efficacy = 100 return self.solution - def execute(self, orign_model): + def pre_execute(self): LOG.info(_LI("Initializing Workload Stabilization")) - current_model = orign_model - if orign_model is None: + if self.model is None: raise exception.ClusterStateNotDefined() - migration = self.check_threshold(current_model) + def do_execute(self): + migration = self.check_threshold() if migration: - hosts_load = self.get_hosts_load(current_model) + hosts_load = self.get_hosts_load() min_sd = 1 balanced = False for vm_host in migration: - dst_hp_disk = current_model.get_resource_from_id( + dst_hp_disk = self.model.get_resource_from_id( resource.ResourceType.disk).get_capacity( - current_model.get_hypervisor_from_id(vm_host['host'])) - vm_disk = current_model.get_resource_from_id( + self.model.get_hypervisor_from_id(vm_host['host'])) + vm_disk = self.model.get_resource_from_id( resource.ResourceType.disk).get_capacity( - current_model.get_vm_from_id(vm_host['vm'])) + self.model.get_vm_from_id(vm_host['vm'])) if vm_disk > dst_hp_disk: continue vm_load = self.calculate_migration_case(hosts_load, vm_host['vm'], vm_host['s_host'], - vm_host['host'], - current_model) + vm_host['host']) weighted_sd = self.calculate_weighted_sd(vm_load[:-1]) if weighted_sd < min_sd: min_sd = weighted_sd hosts_load = vm_load[-1] - self.migrate(current_model, vm_host['vm'], + self.migrate(vm_host['vm'], vm_host['s_host'], vm_host['host']) for metric, value in zip(self.metrics, vm_load[:-1]): @@ -408,4 +403,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): break if balanced: break - return self.fill_solution(current_model) + return self.fill_solution() + + def post_execute(self): + """Post-execution phase + + This can be used to compute the global efficacy + """ + self.solution.model = self.model diff --git a/watcher/metrics_engine/cluster_model_collector/manager.py b/watcher/metrics_engine/cluster_model_collector/manager.py index a65d0e0bc..fdad56838 100644 --- a/watcher/metrics_engine/cluster_model_collector/manager.py +++ b/watcher/metrics_engine/cluster_model_collector/manager.py @@ -27,6 +27,7 @@ CONF = cfg.CONF class CollectorManager(object): + def get_cluster_model_collector(self, osc=None): """:param osc: an OpenStackClients instance""" nova = nova_helper.NovaHelper(osc=osc) diff --git a/watcher/tests/decision_engine/planner/test_default_planner.py b/watcher/tests/decision_engine/planner/test_default_planner.py index 9cb46122d..d27ab7999 100644 --- a/watcher/tests/decision_engine/planner/test_default_planner.py +++ b/watcher/tests/decision_engine/planner/test_default_planner.py @@ -36,10 +36,11 @@ class SolutionFaker(object): def build(): metrics = fake.FakerMetricsCollector() current_state_cluster = faker_cluster_state.FakerModelCollector() - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.\ - MagicMock(get_statistics=metrics.mock_get_statistics) - return sercon.execute(current_state_cluster.generate_scenario_1()) + sercon = strategies.BasicConsolidation(config=mock.Mock()) + sercon._model = current_state_cluster.generate_scenario_1() + sercon.ceilometer = mock.MagicMock( + get_statistics=metrics.mock_get_statistics) + return sercon.execute() class SolutionFakerSingleHyp(object): @@ -47,12 +48,12 @@ class SolutionFakerSingleHyp(object): def build(): metrics = fake.FakerMetricsCollector() current_state_cluster = faker_cluster_state.FakerModelCollector() - sercon = strategies.BasicConsolidation() - sercon.ceilometer = \ - mock.MagicMock(get_statistics=metrics.mock_get_statistics) - - return sercon.execute( + sercon = strategies.BasicConsolidation(config=mock.Mock()) + sercon._model = ( current_state_cluster.generate_scenario_3_with_2_hypervisors()) + sercon.ceilometer = mock.MagicMock( + get_statistics=metrics.mock_get_statistics) + return sercon.execute() class TestActionScheduling(base.DbTestCase): diff --git a/watcher/tests/decision_engine/strategy/context/test_strategy_context.py b/watcher/tests/decision_engine/strategy/context/test_strategy_context.py index bdfd67c1d..42b335a8f 100644 --- a/watcher/tests/decision_engine/strategy/context/test_strategy_context.py +++ b/watcher/tests/decision_engine/strategy/context/test_strategy_context.py @@ -13,6 +13,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. + import mock from watcher.decision_engine.solution.default import DefaultSolution @@ -20,10 +21,7 @@ from watcher.decision_engine.strategy.context.default import \ DefaultStrategyContext from watcher.decision_engine.strategy.selection.default import \ DefaultStrategySelector -from watcher.decision_engine.strategy.strategies.dummy_strategy import \ - DummyStrategy -from watcher.metrics_engine.cluster_model_collector.manager import \ - CollectorManager +from watcher.decision_engine.strategy.strategies import dummy_strategy from watcher.tests.db import base from watcher.tests.objects import utils as obj_utils @@ -32,18 +30,19 @@ class TestStrategyContext(base.DbTestCase): def setUp(self): super(TestStrategyContext, self).setUp() obj_utils.create_test_goal(self.context, id=1, name="DUMMY") - audit_template = obj_utils.create_test_audit_template( - self.context) + audit_template = obj_utils.create_test_audit_template(self.context) self.audit = obj_utils.create_test_audit( self.context, audit_template_id=audit_template.id) strategy_context = DefaultStrategyContext() + @mock.patch.object(dummy_strategy.DummyStrategy, 'model', + new_callable=mock.PropertyMock) @mock.patch.object(DefaultStrategySelector, 'select') - @mock.patch.object(CollectorManager, "get_cluster_model_collector", - mock.Mock()) - def test_execute_strategy(self, mock_call): - mock_call.return_value = DummyStrategy() + def test_execute_strategy(self, mock_call, m_model): + m_model.return_value = mock.Mock() + mock_call.return_value = dummy_strategy.DummyStrategy( + config=mock.Mock()) solution = self.strategy_context.execute_strategy( self.audit.uuid, self.context) self.assertIsInstance(solution, DefaultSolution) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py index 3f925c429..aefca9e20 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py @@ -31,11 +31,30 @@ from watcher.tests.decision_engine.strategy.strategies \ class TestBasicConsolidation(base.BaseTestCase): - # fake metrics - fake_metrics = faker_metrics_collector.FakerMetricsCollector() - # fake cluster - fake_cluster = faker_cluster_state.FakerModelCollector() + def setUp(self): + super(TestBasicConsolidation, self).setUp() + # fake metrics + self.fake_metrics = faker_metrics_collector.FakerMetricsCollector() + # fake cluster + self.fake_cluster = faker_cluster_state.FakerModelCollector() + + p_model = mock.patch.object( + strategies.BasicConsolidation, "model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + p_ceilometer = mock.patch.object( + strategies.BasicConsolidation, "ceilometer", + new_callable=mock.PropertyMock) + self.m_ceilometer = p_ceilometer.start() + self.addCleanup(p_ceilometer.stop) + + self.m_model.return_value = model_root.ModelRoot() + self.m_ceilometer.return_value = mock.Mock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.strategy = strategies.BasicConsolidation(config=mock.Mock()) def test_cluster_size(self): size_cluster = len( @@ -44,134 +63,98 @@ class TestBasicConsolidation(base.BaseTestCase): self.assertEqual(size_cluster_assert, size_cluster) def test_basic_consolidation_score_hypervisor(self): - cluster = self.fake_cluster.generate_scenario_1() - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model node_1_score = 0.023333333333333317 - self.assertEqual(node_1_score, sercon.calculate_score_node( - cluster.get_hypervisor_from_id("Node_1"), - cluster)) + self.assertEqual(node_1_score, self.strategy.calculate_score_node( + model.get_hypervisor_from_id("Node_1"))) node_2_score = 0.26666666666666666 - self.assertEqual(node_2_score, sercon.calculate_score_node( - cluster.get_hypervisor_from_id("Node_2"), - cluster)) + self.assertEqual(node_2_score, self.strategy.calculate_score_node( + model.get_hypervisor_from_id("Node_2"))) node_0_score = 0.023333333333333317 - self.assertEqual(node_0_score, sercon.calculate_score_node( - cluster.get_hypervisor_from_id("Node_0"), - cluster)) + self.assertEqual(node_0_score, self.strategy.calculate_score_node( + model.get_hypervisor_from_id("Node_0"))) def test_basic_consolidation_score_vm(self): - cluster = self.fake_cluster.generate_scenario_1() - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - vm_0 = cluster.get_vm_from_id("VM_0") + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + vm_0 = model.get_vm_from_id("VM_0") vm_0_score = 0.023333333333333317 - self.assertEqual(vm_0_score, sercon.calculate_score_vm(vm_0, cluster)) + self.assertEqual(vm_0_score, self.strategy.calculate_score_vm(vm_0)) - vm_1 = cluster.get_vm_from_id("VM_1") + vm_1 = model.get_vm_from_id("VM_1") vm_1_score = 0.023333333333333317 - self.assertEqual(vm_1_score, sercon.calculate_score_vm(vm_1, cluster)) - vm_2 = cluster.get_vm_from_id("VM_2") + self.assertEqual(vm_1_score, self.strategy.calculate_score_vm(vm_1)) + vm_2 = model.get_vm_from_id("VM_2") vm_2_score = 0.033333333333333326 - self.assertEqual(vm_2_score, sercon.calculate_score_vm(vm_2, cluster)) - vm_6 = cluster.get_vm_from_id("VM_6") + self.assertEqual(vm_2_score, self.strategy.calculate_score_vm(vm_2)) + vm_6 = model.get_vm_from_id("VM_6") vm_6_score = 0.02666666666666669 - self.assertEqual(vm_6_score, sercon.calculate_score_vm(vm_6, cluster)) - vm_7 = cluster.get_vm_from_id("VM_7") + self.assertEqual(vm_6_score, self.strategy.calculate_score_vm(vm_6)) + vm_7 = model.get_vm_from_id("VM_7") vm_7_score = 0.013333333333333345 - self.assertEqual(vm_7_score, sercon.calculate_score_vm(vm_7, cluster)) + self.assertEqual(vm_7_score, self.strategy.calculate_score_vm(vm_7)) def test_basic_consolidation_score_vm_disk(self): - cluster = self.fake_cluster.generate_scenario_5_with_vm_disk_0() - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - vm_0 = cluster.get_vm_from_id("VM_0") + model = self.fake_cluster.generate_scenario_5_with_vm_disk_0() + self.m_model.return_value = model + vm_0 = model.get_vm_from_id("VM_0") vm_0_score = 0.023333333333333355 - self.assertEqual(vm_0_score, sercon.calculate_score_vm(vm_0, cluster)) + self.assertEqual(vm_0_score, self.strategy.calculate_score_vm(vm_0, )) def test_basic_consolidation_weight(self): - cluster = self.fake_cluster.generate_scenario_1() - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - vm_0 = cluster.get_vm_from_id("VM_0") + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + vm_0 = model.get_vm_from_id("VM_0") cores = 16 # 80 Go disk = 80 # mem 8 Go mem = 8 vm_0_weight_assert = 3.1999999999999997 - self.assertEqual(vm_0_weight_assert, - sercon.calculate_weight(cluster, vm_0, cores, disk, - mem)) + self.assertEqual( + vm_0_weight_assert, + self.strategy.calculate_weight(vm_0, cores, disk, mem)) def test_calculate_migration_efficacy(self): - sercon = strategies.BasicConsolidation() - sercon.calculate_migration_efficacy() + self.strategy.calculate_migration_efficacy() def test_exception_model(self): - sercon = strategies.BasicConsolidation() - self.assertRaises(exception.ClusterStateNotDefined, sercon.execute, - None) + self.m_model.return_value = None + self.assertRaises( + exception.ClusterStateNotDefined, self.strategy.execute) def test_exception_cluster_empty(self): - sercon = strategies.BasicConsolidation() model = model_root.ModelRoot() - self.assertRaises(exception.ClusterEmpty, sercon.execute, - model) - - def test_calculate_score_vm_raise_cluster_state_not_found(self): - metrics = faker_metrics_collector.FakerMetricsCollector() - metrics.empty_one_metric("CPU_COMPUTE") - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - - self.assertRaises(exception.ClusterStateNotDefined, - sercon.calculate_score_vm, "VM_1", None) + self.m_model.return_value = model + self.assertRaises(exception.ClusterEmpty, self.strategy.execute) def test_check_migration(self): - sercon = strategies.BasicConsolidation() - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + self.m_model.return_value = model all_vms = model.get_all_vms() all_hyps = model.get_all_hypervisors() vm0 = all_vms[list(all_vms.keys())[0]] hyp0 = all_hyps[list(all_hyps.keys())[0]] - sercon.check_migration(model, hyp0, hyp0, vm0) + self.strategy.check_migration(hyp0, hyp0, vm0) def test_threshold(self): - sercon = strategies.BasicConsolidation() - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + self.m_model.return_value = model all_hyps = model.get_all_hypervisors() hyp0 = all_hyps[list(all_hyps.keys())[0]] - sercon.check_threshold(model, hyp0, 1000, 1000, 1000) - - threshold_cores = sercon.get_threshold_cores() - sercon.set_threshold_cores(threshold_cores + 1) - self.assertEqual(threshold_cores + 1, sercon.get_threshold_cores()) - - def test_number_of(self): - sercon = strategies.BasicConsolidation() - sercon.get_number_of_released_nodes() - sercon.get_number_of_migrations() + self.assertFalse(self.strategy.check_threshold( + hyp0, 1000, 1000, 1000)) def test_basic_consolidation_migration(self): - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + self.m_model.return_value = model - solution = sercon.execute( - self.fake_cluster.generate_scenario_3_with_2_hypervisors()) + solution = self.strategy.execute() actions_counter = collections.Counter( [action.get('action_type') for action in solution.actions]) @@ -187,27 +170,22 @@ class TestBasicConsolidation(base.BaseTestCase): # calculate_weight def test_execute_no_workload(self): - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) + model = ( + self.fake_cluster + .generate_scenario_4_with_1_hypervisor_no_vm()) + self.m_model.return_value = model - current_state_cluster = faker_cluster_state.FakerModelCollector() - model = current_state_cluster. \ - generate_scenario_4_with_1_hypervisor_no_vm() - - with mock.patch.object(strategies.BasicConsolidation, - 'calculate_weight') \ - as mock_score_call: + with mock.patch.object( + strategies.BasicConsolidation, 'calculate_weight' + ) as mock_score_call: mock_score_call.return_value = 0 - solution = sercon.execute(model) - self.assertEqual(100, solution.efficacy) + solution = self.strategy.execute() + self.assertEqual(0, solution.efficacy) def test_check_parameters(self): - sercon = strategies.BasicConsolidation() - sercon.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - solution = sercon.execute( - self.fake_cluster.generate_scenario_3_with_2_hypervisors()) + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + self.m_model.return_value = model + solution = self.strategy.execute() loader = default.DefaultActionLoader() for action in solution.actions: loaded_action = loader.load(action['action_type']) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py b/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py index fae80e1ac..a4ecd5345 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py @@ -14,7 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock + from watcher.applier.actions.loading import default +from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base from watcher.tests.decision_engine.strategy.strategies import \ @@ -22,18 +25,33 @@ from watcher.tests.decision_engine.strategy.strategies import \ class TestDummyStrategy(base.TestCase): + + def setUp(self): + super(TestDummyStrategy, self).setUp() + # fake cluster + self.fake_cluster = faker_cluster_state.FakerModelCollector() + + p_model = mock.patch.object( + strategies.DummyStrategy, "model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + self.m_model.return_value = model_root.ModelRoot() + self.strategy = strategies.DummyStrategy(config=mock.Mock()) + + self.m_model.return_value = model_root.ModelRoot() + self.strategy = strategies.DummyStrategy(config=mock.Mock()) + def test_dummy_strategy(self): - dummy = strategies.DummyStrategy() - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() - solution = dummy.execute(model) + dummy = strategies.DummyStrategy(config=mock.Mock()) + solution = dummy.execute() self.assertEqual(3, len(solution.actions)) def test_check_parameters(self): - dummy = strategies.DummyStrategy() - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() - solution = dummy.execute(model) + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + self.m_model.return_value = model + solution = self.strategy.execute() loader = default.DefaultActionLoader() for action in solution.actions: loaded_action = loader.load(action['action_type']) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py index 9578f1c52..ccb9bf8e6 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py @@ -32,93 +32,95 @@ from watcher.tests.decision_engine.strategy.strategies \ class TestOutletTempControl(base.BaseTestCase): - # fake metrics - fake_metrics = faker_metrics_collector.FakerMetricsCollector() - # fake cluster - fake_cluster = faker_cluster_state.FakerModelCollector() + def setUp(self): + super(TestOutletTempControl, self).setUp() + # fake metrics + self.fake_metrics = faker_metrics_collector.FakerMetricsCollector() + + # fake cluster + self.fake_cluster = faker_cluster_state.FakerModelCollector() + + p_model = mock.patch.object( + strategies.OutletTempControl, "model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + p_ceilometer = mock.patch.object( + strategies.OutletTempControl, "ceilometer", + new_callable=mock.PropertyMock) + self.m_ceilometer = p_ceilometer.start() + self.addCleanup(p_ceilometer.stop) + + self.m_model.return_value = model_root.ModelRoot() + self.m_ceilometer.return_value = mock.Mock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.strategy = strategies.OutletTempControl(config=mock.Mock()) def test_calc_used_res(self): model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() - strategy = strategies.OutletTempControl() + self.m_model.return_value = model hypervisor = model.get_hypervisor_from_id('Node_0') cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) cap_mem = model.get_resource_from_id(resource.ResourceType.memory) cap_disk = model.get_resource_from_id(resource.ResourceType.disk) - cores_used, mem_used, disk_used = strategy.calc_used_res(model, - hypervisor, - cap_cores, - cap_mem, - cap_disk) + cores_used, mem_used, disk_used = self.strategy.calc_used_res( + hypervisor, cap_cores, cap_mem, cap_disk) self.assertEqual((10, 2, 20), (cores_used, mem_used, disk_used)) def test_group_hosts_by_outlet_temp(self): model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() - strategy = strategies.OutletTempControl() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - h1, h2 = strategy.group_hosts_by_outlet_temp(model) + self.m_model.return_value = model + h1, h2 = self.strategy.group_hosts_by_outlet_temp() self.assertEqual('Node_1', h1[0]['hv'].uuid) self.assertEqual('Node_0', h2[0]['hv'].uuid) def test_choose_vm_to_migrate(self): model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() - strategy = strategies.OutletTempControl() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - h1, h2 = strategy.group_hosts_by_outlet_temp(model) - vm_to_mig = strategy.choose_vm_to_migrate(model, h1) + self.m_model.return_value = model + h1, h2 = self.strategy.group_hosts_by_outlet_temp() + vm_to_mig = self.strategy.choose_vm_to_migrate(h1) self.assertEqual('Node_1', vm_to_mig[0].uuid) self.assertEqual('a4cab39b-9828-413a-bf88-f76921bf1517', vm_to_mig[1].uuid) def test_filter_dest_servers(self): model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() - strategy = strategies.OutletTempControl() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - h1, h2 = strategy.group_hosts_by_outlet_temp(model) - vm_to_mig = strategy.choose_vm_to_migrate(model, h1) - dest_hosts = strategy.filter_dest_servers(model, h2, vm_to_mig[1]) + self.m_model.return_value = model + h1, h2 = self.strategy.group_hosts_by_outlet_temp() + vm_to_mig = self.strategy.choose_vm_to_migrate(h1) + dest_hosts = self.strategy.filter_dest_servers(h2, vm_to_mig[1]) self.assertEqual(1, len(dest_hosts)) self.assertEqual('Node_0', dest_hosts[0]['hv'].uuid) def test_exception_model(self): - strategy = strategies.OutletTempControl() - self.assertRaises(exception.ClusterStateNotDefined, strategy.execute, - None) + self.m_model.return_value = None + self.assertRaises( + exception.ClusterStateNotDefined, self.strategy.execute) def test_exception_cluster_empty(self): - strategy = strategies.OutletTempControl() model = model_root.ModelRoot() - self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + self.m_model.return_value = model + self.assertRaises(exception.ClusterEmpty, self.strategy.execute) def test_execute_cluster_empty(self): - strategy = strategies.OutletTempControl() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) model = model_root.ModelRoot() - self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + self.m_model.return_value = model + self.assertRaises(exception.ClusterEmpty, self.strategy.execute) def test_execute_no_workload(self): - strategy = strategies.OutletTempControl() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) + model = self.fake_cluster.generate_scenario_4_with_1_hypervisor_no_vm() + self.m_model.return_value = model - current_state_cluster = faker_cluster_state.FakerModelCollector() - model = current_state_cluster. \ - generate_scenario_4_with_1_hypervisor_no_vm() - - solution = strategy.execute(model) + solution = self.strategy.execute() self.assertEqual([], solution.actions) def test_execute(self): - strategy = strategies.OutletTempControl() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() - solution = strategy.execute(model) + self.m_model.return_value = model + solution = self.strategy.execute() actions_counter = collections.Counter( [action.get('action_type') for action in solution.actions]) @@ -126,11 +128,9 @@ class TestOutletTempControl(base.BaseTestCase): self.assertEqual(1, num_migrations) def test_check_parameters(self): - outlet = strategies.OutletTempControl() - outlet.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() - solution = outlet.execute(model) + self.m_model.return_value = model + solution = self.strategy.execute() loader = default.DefaultActionLoader() for action in solution.actions: loaded_action = loader.load(action['action_type']) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py index 3472d14d3..d53db73f5 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py @@ -17,213 +17,215 @@ # See the License for the specific language governing permissions and # limitations under the License. # + import mock +from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base from watcher.tests.decision_engine.strategy.strategies \ import faker_cluster_and_metrics -class TestSmartConsolidation(base.BaseTestCase): - fake_cluster = faker_cluster_and_metrics.FakerModelCollector() +class TestVMWorkloadConsolidation(base.BaseTestCase): + + def setUp(self): + super(TestVMWorkloadConsolidation, self).setUp() + + # fake cluster + self.fake_cluster = faker_cluster_and_metrics.FakerModelCollector() + + p_model = mock.patch.object( + strategies.VMWorkloadConsolidation, "model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + p_ceilometer = mock.patch.object( + strategies.VMWorkloadConsolidation, "ceilometer", + new_callable=mock.PropertyMock) + self.m_ceilometer = p_ceilometer.start() + self.addCleanup(p_ceilometer.stop) + + # fake metrics + self.fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics( + self.m_model.return_value) + + self.m_model.return_value = model_root.ModelRoot() + self.m_ceilometer.return_value = mock.Mock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.strategy = strategies.VMWorkloadConsolidation(config=mock.Mock()) def test_get_vm_utilization(self): - cluster = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) - vm_0 = cluster.get_vm_from_id("VM_0") + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + self.fake_metrics.model = model + vm_0 = model.get_vm_from_id("VM_0") vm_util = dict(cpu=1.0, ram=1, disk=10) self.assertEqual(vm_util, - strategy.get_vm_utilization(vm_0.uuid, cluster)) + self.strategy.get_vm_utilization(vm_0.uuid, model)) def test_get_hypervisor_utilization(self): - cluster = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) - node_0 = cluster.get_hypervisor_from_id("Node_0") + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + self.fake_metrics.model = model + node_0 = model.get_hypervisor_from_id("Node_0") node_util = dict(cpu=1.0, ram=1, disk=10) - self.assertEqual(node_util, - strategy.get_hypervisor_utilization(node_0, cluster)) + self.assertEqual( + node_util, + self.strategy.get_hypervisor_utilization(node_0, model)) def test_get_hypervisor_capacity(self): - cluster = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) - node_0 = cluster.get_hypervisor_from_id("Node_0") + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + self.fake_metrics.model = model + node_0 = model.get_hypervisor_from_id("Node_0") node_util = dict(cpu=40, ram=64, disk=250) self.assertEqual(node_util, - strategy.get_hypervisor_capacity(node_0, cluster)) + self.strategy.get_hypervisor_capacity(node_0, model)) def test_get_relative_hypervisor_utilization(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model hypervisor = model.get_hypervisor_from_id('Node_0') - rhu = strategy.get_relative_hypervisor_utilization(hypervisor, model) + rhu = self.strategy.get_relative_hypervisor_utilization( + hypervisor, model) expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025} self.assertEqual(expected_rhu, rhu) def test_get_relative_cluster_utilization(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) - cru = strategy.get_relative_cluster_utilization(model) + self.m_model.return_value = model + self.fake_metrics.model = model + cru = self.strategy.get_relative_cluster_utilization(model) expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375} self.assertEqual(expected_cru, cru) def test_add_migration(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h1 = model.get_hypervisor_from_id('Node_0') h2 = model.get_hypervisor_from_id('Node_1') vm_uuid = 'VM_0' - strategy.add_migration(vm_uuid, h1, h2, model) - self.assertEqual(1, len(strategy.solution.actions)) + self.strategy.add_migration(vm_uuid, h1, h2, model) + self.assertEqual(1, len(self.strategy.solution.actions)) expected = {'action_type': 'migrate', 'input_parameters': {'dst_hypervisor': h2.uuid, 'src_hypervisor': h1.uuid, 'migration_type': 'live', 'resource_id': vm_uuid}} - self.assertEqual(expected, strategy.solution.actions[0]) + self.assertEqual(expected, self.strategy.solution.actions[0]) def test_is_overloaded(self): - strategy = strategies.VMWorkloadConsolidation() model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h1 = model.get_hypervisor_from_id('Node_0') cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - res = strategy.is_overloaded(h1, model, cc) + res = self.strategy.is_overloaded(h1, model, cc) self.assertEqual(False, res) cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} - res = strategy.is_overloaded(h1, model, cc) + res = self.strategy.is_overloaded(h1, model, cc) self.assertEqual(False, res) cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0} - res = strategy.is_overloaded(h1, model, cc) + res = self.strategy.is_overloaded(h1, model, cc) self.assertEqual(True, res) def test_vm_fits(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h = model.get_hypervisor_from_id('Node_1') vm_uuid = 'VM_0' cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - res = strategy.vm_fits(vm_uuid, h, model, cc) + res = self.strategy.vm_fits(vm_uuid, h, model, cc) self.assertEqual(True, res) cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} - res = strategy.vm_fits(vm_uuid, h, model, cc) + res = self.strategy.vm_fits(vm_uuid, h, model, cc) self.assertEqual(False, res) def test_add_action_activate_hypervisor(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h = model.get_hypervisor_from_id('Node_0') - strategy.add_action_activate_hypervisor(h) + self.strategy.add_action_activate_hypervisor(h) expected = [{'action_type': 'change_nova_service_state', 'input_parameters': {'state': 'up', 'resource_id': 'Node_0'}}] - self.assertEqual(expected, strategy.solution.actions) + self.assertEqual(expected, self.strategy.solution.actions) def test_add_action_deactivate_hypervisor(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h = model.get_hypervisor_from_id('Node_0') - strategy.add_action_deactivate_hypervisor(h) + self.strategy.add_action_deactivate_hypervisor(h) expected = [{'action_type': 'change_nova_service_state', 'input_parameters': {'state': 'down', 'resource_id': 'Node_0'}}] - self.assertEqual(expected, strategy.solution.actions) + self.assertEqual(expected, self.strategy.solution.actions) def test_deactivate_unused_hypervisors(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h1 = model.get_hypervisor_from_id('Node_0') h2 = model.get_hypervisor_from_id('Node_1') vm_uuid = 'VM_0' - strategy.deactivate_unused_hypervisors(model) - self.assertEqual(0, len(strategy.solution.actions)) + self.strategy.deactivate_unused_hypervisors(model) + self.assertEqual(0, len(self.strategy.solution.actions)) # Migrate VM to free the hypervisor - strategy.add_migration(vm_uuid, h1, h2, model) + self.strategy.add_migration(vm_uuid, h1, h2, model) - strategy.deactivate_unused_hypervisors(model) + self.strategy.deactivate_unused_hypervisors(model) expected = {'action_type': 'change_nova_service_state', 'input_parameters': {'state': 'down', 'resource_id': 'Node_0'}} - self.assertEqual(2, len(strategy.solution.actions)) - self.assertEqual(expected, strategy.solution.actions[1]) + self.assertEqual(2, len(self.strategy.solution.actions)) + self.assertEqual(expected, self.strategy.solution.actions[1]) def test_offload_phase(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - strategy.offload_phase(model, cc) + self.strategy.offload_phase(model, cc) expected = [] - self.assertEqual(expected, strategy.solution.actions) + self.assertEqual(expected, self.strategy.solution.actions) def test_consolidation_phase(self): model = self.fake_cluster.generate_scenario_1() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h1 = model.get_hypervisor_from_id('Node_0') h2 = model.get_hypervisor_from_id('Node_1') vm_uuid = 'VM_0' cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - strategy.consolidation_phase(model, cc) + self.strategy.consolidation_phase(model, cc) expected = [{'action_type': 'migrate', 'input_parameters': {'dst_hypervisor': h2.uuid, 'src_hypervisor': h1.uuid, 'migration_type': 'live', 'resource_id': vm_uuid}}] - self.assertEqual(expected, strategy.solution.actions) + self.assertEqual(expected, self.strategy.solution.actions) def test_strategy(self): model = self.fake_cluster.generate_scenario_2() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h1 = model.get_hypervisor_from_id('Node_0') cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - strategy.offload_phase(model, cc) - strategy.consolidation_phase(model, cc) - strategy.optimize_solution(model) - h2 = strategy.solution.actions[0][ + self.strategy.offload_phase(model, cc) + self.strategy.consolidation_phase(model, cc) + self.strategy.optimize_solution(model) + h2 = self.strategy.solution.actions[0][ 'input_parameters']['dst_hypervisor'] expected = [{'action_type': 'migrate', 'input_parameters': {'dst_hypervisor': h2, @@ -236,18 +238,16 @@ class TestSmartConsolidation(base.BaseTestCase): 'migration_type': 'live', 'resource_id': 'VM_1'}}] - self.assertEqual(expected, strategy.solution.actions) + self.assertEqual(expected, self.strategy.solution.actions) def test_strategy2(self): model = self.fake_cluster.generate_scenario_3() - fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) - strategy = strategies.VMWorkloadConsolidation() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=fake_metrics.mock_get_statistics) + self.m_model.return_value = model + self.fake_metrics.model = model h1 = model.get_hypervisor_from_id('Node_0') h2 = model.get_hypervisor_from_id('Node_1') cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} - strategy.offload_phase(model, cc) + self.strategy.offload_phase(model, cc) expected = [{'action_type': 'migrate', 'input_parameters': {'dst_hypervisor': h2.uuid, 'migration_type': 'live', @@ -263,15 +263,15 @@ class TestSmartConsolidation(base.BaseTestCase): 'migration_type': 'live', 'resource_id': 'VM_8', 'src_hypervisor': h1.uuid}}] - self.assertEqual(expected, strategy.solution.actions) - strategy.consolidation_phase(model, cc) + self.assertEqual(expected, self.strategy.solution.actions) + self.strategy.consolidation_phase(model, cc) expected.append({'action_type': 'migrate', 'input_parameters': {'dst_hypervisor': h1.uuid, 'migration_type': 'live', 'resource_id': 'VM_7', 'src_hypervisor': h2.uuid}}) - self.assertEqual(expected, strategy.solution.actions) - strategy.optimize_solution(model) + self.assertEqual(expected, self.strategy.solution.actions) + self.strategy.optimize_solution(model) del expected[3] del expected[1] - self.assertEqual(expected, strategy.solution.actions) + self.assertEqual(expected, self.strategy.solution.actions) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py index d361eca03..4a9e5724b 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -23,7 +23,7 @@ from watcher.applier.actions.loading import default from watcher.common import exception from watcher.decision_engine.model import model_root from watcher.decision_engine.model import resource -from watcher.decision_engine.strategy.strategies import workload_balance +from watcher.decision_engine.strategy import strategies from watcher.tests import base from watcher.tests.decision_engine.strategy.strategies \ import faker_cluster_state @@ -32,31 +32,51 @@ from watcher.tests.decision_engine.strategy.strategies \ class TestWorkloadBalance(base.BaseTestCase): - # fake metrics - fake_metrics = faker_metrics_collector.FakerMetricsCollector() - # fake cluster - fake_cluster = faker_cluster_state.FakerModelCollector() + def setUp(self): + super(TestWorkloadBalance, self).setUp() + # fake metrics + self.fake_metrics = faker_metrics_collector.FakerMetricsCollector() + # fake cluster + self.fake_cluster = faker_cluster_state.FakerModelCollector() + + p_model = mock.patch.object( + strategies.WorkloadBalance, "model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + p_ceilometer = mock.patch.object( + strategies.BasicConsolidation, "ceilometer", + new_callable=mock.PropertyMock) + self.m_ceilometer = p_ceilometer.start() + self.addCleanup(p_ceilometer.stop) + + self.m_model.return_value = model_root.ModelRoot() + self.m_ceilometer.return_value = mock.Mock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.strategy = strategies.WorkloadBalance(config=mock.Mock()) def test_calc_used_res(self): model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() - strategy = workload_balance.WorkloadBalance() + self.m_model.return_value = model hypervisor = model.get_hypervisor_from_id('Node_0') cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) cap_mem = model.get_resource_from_id(resource.ResourceType.memory) cap_disk = model.get_resource_from_id(resource.ResourceType.disk) - cores_used, mem_used, disk_used = strategy.calculate_used_resource( - model, hypervisor, cap_cores, cap_mem, cap_disk) + cores_used, mem_used, disk_used = ( + self.strategy.calculate_used_resource( + hypervisor, cap_cores, cap_mem, cap_disk)) self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40)) def test_group_hosts_by_cpu_util(self): model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() - strategy = workload_balance.WorkloadBalance() - strategy.threshold = 30 - strategy.ceilometer = mock.MagicMock( + self.m_model.return_value = model + self.strategy.threshold = 30 + self.strategy.ceilometer = mock.MagicMock( statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) - h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) + h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util() # print h1, h2, avg, w_map self.assertEqual(h1[0]['hv'].uuid, 'Node_0') self.assertEqual(h2[0]['hv'].uuid, 'Node_1') @@ -64,73 +84,69 @@ class TestWorkloadBalance(base.BaseTestCase): def test_choose_vm_to_migrate(self): model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() - strategy = workload_balance.WorkloadBalance() - strategy.ceilometer = mock.MagicMock( + self.m_model.return_value = model + self.strategy.ceilometer = mock.MagicMock( statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) - h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) - vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) + h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util() + vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map) self.assertEqual(vm_to_mig[0].uuid, 'Node_0') self.assertEqual(vm_to_mig[1].uuid, "73b09e16-35b7-4922-804e-e8f5d9b740fc") def test_choose_vm_notfound(self): model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() - strategy = workload_balance.WorkloadBalance() - strategy.ceilometer = mock.MagicMock( + self.m_model.return_value = model + self.strategy.ceilometer = mock.MagicMock( statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) - h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) + h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util() vms = model.get_all_vms() vms.clear() - vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) + vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map) self.assertEqual(vm_to_mig, None) def test_filter_destination_hosts(self): model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() - strategy = workload_balance.WorkloadBalance() - strategy.ceilometer = mock.MagicMock( + self.m_model.return_value = model + self.strategy.ceilometer = mock.MagicMock( statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) - h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) - vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) - dest_hosts = strategy.filter_destination_hosts(model, h2, vm_to_mig[1], - avg, w_map) + h1, h2, avg, w_map = self.strategy.group_hosts_by_cpu_util() + vm_to_mig = self.strategy.choose_vm_to_migrate(h1, avg, w_map) + dest_hosts = self.strategy.filter_destination_hosts( + h2, vm_to_mig[1], avg, w_map) self.assertEqual(len(dest_hosts), 1) self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_1') def test_exception_model(self): - strategy = workload_balance.WorkloadBalance() - self.assertRaises(exception.ClusterStateNotDefined, strategy.execute, - None) + self.m_model.return_value = None + self.assertRaises( + exception.ClusterStateNotDefined, self.strategy.execute) def test_exception_cluster_empty(self): - strategy = workload_balance.WorkloadBalance() model = model_root.ModelRoot() - self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + self.m_model.return_value = model + self.assertRaises(exception.ClusterEmpty, self.strategy.execute) def test_execute_cluster_empty(self): - strategy = workload_balance.WorkloadBalance() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) model = model_root.ModelRoot() - self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + self.m_model.return_value = model + self.strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + self.assertRaises(exception.ClusterEmpty, self.strategy.execute) def test_execute_no_workload(self): - strategy = workload_balance.WorkloadBalance() - strategy.ceilometer = mock.MagicMock( + model = self.fake_cluster.generate_scenario_4_with_1_hypervisor_no_vm() + self.m_model.return_value = model + self.strategy.ceilometer = mock.MagicMock( statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) - - current_state_cluster = faker_cluster_state.FakerModelCollector() - model = current_state_cluster. \ - generate_scenario_4_with_1_hypervisor_no_vm() - - solution = strategy.execute(model) + solution = self.strategy.execute() self.assertEqual(solution.actions, []) def test_execute(self): - strategy = workload_balance.WorkloadBalance() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() - solution = strategy.execute(model) + self.m_model.return_value = model + self.strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + solution = self.strategy.execute() actions_counter = collections.Counter( [action.get('action_type') for action in solution.actions]) @@ -138,11 +154,11 @@ class TestWorkloadBalance(base.BaseTestCase): self.assertEqual(num_migrations, 1) def test_check_parameters(self): - strategy = workload_balance.WorkloadBalance() - strategy.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() - solution = strategy.execute(model) + self.m_model.return_value = model + self.strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + solution = self.strategy.execute() loader = default.DefaultActionLoader() for action in solution.actions: loaded_action = loader.load(action['action_type']) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py index 6220c54fc..310ed9086 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py @@ -19,6 +19,7 @@ import mock +from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base from watcher.tests.decision_engine.strategy.strategies \ @@ -28,40 +29,48 @@ from watcher.tests.decision_engine.strategy.strategies \ class TestWorkloadStabilization(base.BaseTestCase): - # fake metrics - fake_metrics = faker_metrics_collector.FakerMetricsCollector() - # fake cluster - fake_cluster = faker_cluster_state.FakerModelCollector() + def setUp(self): + super(TestWorkloadStabilization, self).setUp() - hosts_load_assert = {'Node_0': - {'cpu_util': 0.07, 'memory.resident': 7.0, - 'vcpus': 40}, - 'Node_1': - {'cpu_util': 0.05, 'memory.resident': 5, - 'vcpus': 40}, - 'Node_2': - {'cpu_util': 0.1, 'memory.resident': 29, - 'vcpus': 40}, - 'Node_3': - {'cpu_util': 0.04, 'memory.resident': 8, - 'vcpus': 40}, - 'Node_4': - {'cpu_util': 0.02, 'memory.resident': 4, - 'vcpus': 40}} + # fake metrics + self.fake_metrics = faker_metrics_collector.FakerMetricsCollector() + + # fake cluster + self.fake_cluster = faker_cluster_state.FakerModelCollector() + + self.hosts_load_assert = { + 'Node_0': {'cpu_util': 0.07, 'memory.resident': 7.0, 'vcpus': 40}, + 'Node_1': {'cpu_util': 0.05, 'memory.resident': 5, 'vcpus': 40}, + 'Node_2': {'cpu_util': 0.1, 'memory.resident': 29, 'vcpus': 40}, + 'Node_3': {'cpu_util': 0.04, 'memory.resident': 8, 'vcpus': 40}, + 'Node_4': {'cpu_util': 0.02, 'memory.resident': 4, 'vcpus': 40}} + + p_model = mock.patch.object( + strategies.WorkloadStabilization, "model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + p_ceilometer = mock.patch.object( + strategies.WorkloadStabilization, "ceilometer", + new_callable=mock.PropertyMock) + self.m_ceilometer = p_ceilometer.start() + self.addCleanup(p_ceilometer.stop) + + self.m_model.return_value = model_root.ModelRoot() + self.m_ceilometer.return_value = mock.Mock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.strategy = strategies.WorkloadStabilization(config=mock.Mock()) def test_get_vm_load(self): - model = self.fake_cluster.generate_scenario_1() - sd = strategies.WorkloadStabilization() - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.m_model.return_value = self.fake_cluster.generate_scenario_1() vm_0_dict = {'uuid': 'VM_0', 'vcpus': 10, 'cpu_util': 7, 'memory.resident': 2} - self.assertEqual(sd.get_vm_load("VM_0", model), vm_0_dict) + self.assertEqual(vm_0_dict, self.strategy.get_vm_load("VM_0")) def test_normalize_hosts_load(self): - model = self.fake_cluster.generate_scenario_1() - sd = strategies.WorkloadStabilization() + self.m_model.return_value = self.fake_cluster.generate_scenario_1() fake_hosts = {'Node_0': {'cpu_util': 0.07, 'memory.resident': 7}, 'Node_1': {'cpu_util': 0.05, 'memory.resident': 5}} normalized_hosts = {'Node_0': @@ -70,99 +79,82 @@ class TestWorkloadStabilization(base.BaseTestCase): 'Node_1': {'cpu_util': 0.05, 'memory.resident': 0.03787878787878788}} - self.assertEqual(sd.normalize_hosts_load(fake_hosts, model), - normalized_hosts) + self.assertEqual( + normalized_hosts, + self.strategy.normalize_hosts_load(fake_hosts)) def test_get_hosts_load(self): - sd = strategies.WorkloadStabilization() - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.m_model.return_value = self.fake_cluster.generate_scenario_1() self.assertEqual( - sd.get_hosts_load(self.fake_cluster.generate_scenario_1()), + self.strategy.get_hosts_load(), self.hosts_load_assert) def test_get_sd(self): - sd = strategies.WorkloadStabilization() test_cpu_sd = 0.027 test_ram_sd = 9.3 self.assertEqual( - round(sd.get_sd(self.hosts_load_assert, 'cpu_util'), 3), + round(self.strategy.get_sd( + self.hosts_load_assert, 'cpu_util'), 3), test_cpu_sd) self.assertEqual( - round(sd.get_sd(self.hosts_load_assert, 'memory.resident'), 1), + round(self.strategy.get_sd( + self.hosts_load_assert, 'memory.resident'), 1), test_ram_sd) def test_calculate_weighted_sd(self): - sd = strategies.WorkloadStabilization() sd_case = [0.5, 0.75] - self.assertEqual(sd.calculate_weighted_sd(sd_case), 1.25) + self.assertEqual(self.strategy.calculate_weighted_sd(sd_case), 1.25) def test_calculate_migration_case(self): - model = self.fake_cluster.generate_scenario_1() - sd = strategies.WorkloadStabilization() - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - self.assertEqual(sd.calculate_migration_case( - self.hosts_load_assert, "VM_5", "Node_2", "Node_1", - model)[-1]["Node_1"], + self.m_model.return_value = self.fake_cluster.generate_scenario_1() + self.assertEqual( + self.strategy.calculate_migration_case( + self.hosts_load_assert, "VM_5", + "Node_2", "Node_1")[-1]["Node_1"], {'cpu_util': 2.55, 'memory.resident': 21, 'vcpus': 40}) def test_simulate_migrations(self): - sd = strategies.WorkloadStabilization() - sd.host_choice = 'retry' - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + self.strategy.host_choice = 'retry' self.assertEqual( - len(sd.simulate_migrations(self.fake_cluster.generate_scenario_1(), - self.hosts_load_assert)), - 8) + 8, + len(self.strategy.simulate_migrations(self.hosts_load_assert))) def test_check_threshold(self): - sd = strategies.WorkloadStabilization() - sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - sd.simulate_migrations = mock.Mock(return_value=True) - self.assertTrue( - sd.check_threshold(self.fake_cluster.generate_scenario_1())) + self.m_model.return_value = self.fake_cluster.generate_scenario_1() + self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} + self.strategy.simulate_migrations = mock.Mock(return_value=True) + self.assertTrue(self.strategy.check_threshold()) def test_execute_one_migration(self): - sd = strategies.WorkloadStabilization() - model = self.fake_cluster.generate_scenario_1() - sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4', - 's_host': 'Node_2', - 'host': 'Node_1'}]) - with mock.patch.object(sd, 'migrate') as mock_migration: - sd.execute(model) - mock_migration.assert_called_once_with(model, 'VM_4', 'Node_2', - 'Node_1') + self.m_model.return_value = self.fake_cluster.generate_scenario_1() + self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} + self.strategy.simulate_migrations = mock.Mock( + return_value=[{'vm': 'VM_4', 's_host': 'Node_2', 'host': 'Node_1'}] + ) + with mock.patch.object(self.strategy, 'migrate') as mock_migration: + self.strategy.execute() + mock_migration.assert_called_once_with( + 'VM_4', 'Node_2', 'Node_1') def test_execute_multiply_migrations(self): - sd = strategies.WorkloadStabilization() - model = self.fake_cluster.generate_scenario_1() - sd.thresholds = {'cpu_util': 0.00001, 'memory.resident': 0.0001} - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4', - 's_host': 'Node_2', - 'host': 'Node_1'}, - {'vm': 'VM_3', - 's_host': 'Node_2', - 'host': 'Node_4'}]) - with mock.patch.object(sd, 'migrate') as mock_migrate: - sd.execute(model) + self.m_model.return_value = self.fake_cluster.generate_scenario_1() + self.strategy.thresholds = {'cpu_util': 0.00001, + 'memory.resident': 0.0001} + self.strategy.simulate_migrations = mock.Mock( + return_value=[{'vm': 'VM_4', 's_host': 'Node_2', 'host': 'Node_1'}, + {'vm': 'VM_3', 's_host': 'Node_2', 'host': 'Node_3'}] + ) + with mock.patch.object(self.strategy, 'migrate') as mock_migrate: + self.strategy.execute() self.assertEqual(mock_migrate.call_count, 1) def test_execute_nothing_to_migrate(self): - sd = strategies.WorkloadStabilization() - model = self.fake_cluster.generate_scenario_1() - sd.thresholds = {'cpu_util': 0.042, 'memory.resident': 0.0001} - sd.ceilometer = mock.MagicMock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - sd.simulate_migrations = mock.Mock(return_value=False) - with mock.patch.object(sd, 'migrate') as mock_migrate: - sd.execute(model) + self.m_model.return_value = self.fake_cluster.generate_scenario_1() + self.strategy.thresholds = {'cpu_util': 0.042, + 'memory.resident': 0.0001} + self.strategy.simulate_migrations = mock.Mock(return_value=False) + with mock.patch.object(self.strategy, 'migrate') as mock_migrate: + self.strategy.execute() mock_migrate.assert_not_called()