diff --git a/requirements.txt b/requirements.txt index 909e1901f..13ce8abe9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ jsonpatch>=1.1 # BSD keystoneauth1>=2.1.0 # Apache-2.0 keystonemiddleware!=4.1.0,!=4.5.0,>=4.0.0 # Apache-2.0 oslo.concurrency>=3.8.0 # Apache-2.0 +oslo.cache>=1.5.0 # Apache-2.0 oslo.config>=3.9.0 # Apache-2.0 oslo.context>=2.2.0 # Apache-2.0 oslo.db>=4.1.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 107329b7d..24ed95f20 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,6 +50,7 @@ watcher_strategies = basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation + workload_stabilization = watcher.decision_engine.strategy.strategies.workload_stabilization:WorkloadStabilization watcher_actions = migrate = watcher.applier.actions.migration:Migrate diff --git a/watcher/common/exception.py b/watcher/common/exception.py index 06543872c..706fc89d7 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -298,6 +298,14 @@ class NoAvailableStrategyForGoal(WatcherException): msg_fmt = _("No strategy could be found to achieve the '%(goal)s' goal.") +class NoMetricValuesForVM(WatcherException): + msg_fmt = _("No values returned by %(resource_id)s for %(metric_name)s.") + + +class NoSuchMetricForHost(WatcherException): + msg_fmt = _("No %(metric)s metric for %(host)s found.") + + # Model class InstanceNotFound(WatcherException): diff --git a/watcher/decision_engine/strategy/strategies/__init__.py b/watcher/decision_engine/strategy/strategies/__init__.py index 87f5f092c..741421201 100644 --- a/watcher/decision_engine/strategy/strategies/__init__.py +++ b/watcher/decision_engine/strategy/strategies/__init__.py @@ -20,11 +20,14 @@ from watcher.decision_engine.strategy.strategies import dummy_strategy from watcher.decision_engine.strategy.strategies import outlet_temp_control from watcher.decision_engine.strategy.strategies import \ vm_workload_consolidation +from watcher.decision_engine.strategy.strategies import workload_stabilization BasicConsolidation = basic_consolidation.BasicConsolidation OutletTempControl = outlet_temp_control.OutletTempControl DummyStrategy = dummy_strategy.DummyStrategy VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation +WorkloadStabilization = workload_stabilization.WorkloadStabilization __all__ = ("BasicConsolidation", "OutletTempControl", - "DummyStrategy", "VMWorkloadConsolidation") + "DummyStrategy", "VMWorkloadConsolidation", + "WorkloadStabilization") diff --git a/watcher/decision_engine/strategy/strategies/base.py b/watcher/decision_engine/strategy/strategies/base.py index e35b92d18..6dc78e1d4 100644 --- a/watcher/decision_engine/strategy/strategies/base.py +++ b/watcher/decision_engine/strategy/strategies/base.py @@ -222,3 +222,19 @@ class ThermalOptimizationBaseStrategy(BaseStrategy): @classmethod def get_translatable_goal_display_name(cls): return "Thermal optimization" + + +@six.add_metaclass(abc.ABCMeta) +class WorkloadStabilizationBaseStrategy(BaseStrategy): + + @classmethod + def get_goal_name(cls): + return "WORKLOAD_BALANCING" + + @classmethod + def get_goal_display_name(cls): + return _("Workload balancing") + + @classmethod + def get_translatable_goal_display_name(cls): + return "Workload balancing" diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py new file mode 100644 index 000000000..e0de7c633 --- /dev/null +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -0,0 +1,414 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Servionica LLC +# +# Authors: Alexander Chadin +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from copy import deepcopy +import itertools +import math +import random + +import oslo_cache +from oslo_config import cfg +from oslo_log import log + +from watcher._i18n import _LI, _ +from watcher.common import exception +from watcher.decision_engine.model import resource +from watcher.decision_engine.model import vm_state +from watcher.decision_engine.strategy.strategies import base +from watcher.metrics_engine.cluster_history import ceilometer as \ + ceilometer_cluster_history + +LOG = log.getLogger(__name__) + +metrics = ['cpu_util', 'memory.resident'] +thresholds_dict = {'cpu_util': 0.2, 'memory.resident': 0.2} +weights_dict = {'cpu_util_weight': 1.0, 'memory.resident_weight': 1.0} +vm_host_measures = {'cpu_util': 'hardware.cpu.util', + 'memory.resident': 'hardware.memory.used'} + +ws_opts = [ + cfg.ListOpt('metrics', + default=metrics, + required=True, + help='Metrics used as rates of cluster loads.'), + cfg.DictOpt('thresholds', + default=thresholds_dict, + help=''), + cfg.DictOpt('weights', + default=weights_dict, + help='These weights used to calculate ' + 'common standard deviation. Name of weight ' + 'contains meter name and _weight suffix.'), + cfg.StrOpt('host_choice', + default='retry', + required=True, + help="Method of host's choice."), + cfg.IntOpt('retry_count', + default=1, + required=True, + help='Count of random returned hosts.'), +] + +CONF = cfg.CONF + +CONF.register_opts(ws_opts, 'watcher_strategies.workload_stabilization') + + +def _set_memoize(conf): + oslo_cache.configure(conf) + region = oslo_cache.create_region() + configured_region = oslo_cache.configure_cache_region(conf, region) + return oslo_cache.core.get_memoization_decorator(conf, + configured_region, + 'cache') + + +class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): + """Workload Stabilization control using live migration + + *Description* + + This is workload stabilization strategy based on standard deviation + algorithm. The goal is to determine if there is an overload in a cluster + and respond to it by migrating VMs to stabilize the cluster. + + *Requirements* + + * Software: Ceilometer component ceilometer-compute running + in each compute host, and Ceilometer API can report such telemetries + ``memory.resident`` and ``cpu_util`` successfully. + * You must have at least 2 physical compute nodes to run this strategy. + + *Limitations* + + - It assume that live migrations are possible + - Load on the system is sufficiently stable. + + *Spec URL* + + https://review.openstack.org/#/c/286153/ + """ + + MIGRATION = "migrate" + MEMOIZE = _set_memoize(CONF) + + def __init__(self, osc=None): + super(WorkloadStabilization, self).__init__(osc) + self._ceilometer = None + self._nova = None + self.weights = CONF['watcher_strategies.workload_stabilization']\ + .weights + self.metrics = CONF['watcher_strategies.workload_stabilization']\ + .metrics + self.thresholds = CONF['watcher_strategies.workload_stabilization']\ + .thresholds + self.host_choice = CONF['watcher_strategies.workload_stabilization']\ + .host_choice + + @classmethod + def get_name(cls): + return "WORKLOAD_BALANCING" + + @classmethod + def get_display_name(cls): + return _("Workload balancing") + + @classmethod + def get_translatable_display_name(cls): + return "Workload balancing" + + @property + def ceilometer(self): + if self._ceilometer is None: + self._ceilometer = (ceilometer_cluster_history. + CeilometerClusterHistory(osc=self.osc)) + return self._ceilometer + + @property + def nova(self): + if self._nova is None: + self._nova = self.osc.nova() + return self._nova + + @nova.setter + def nova(self, n): + self._nova = n + + @ceilometer.setter + def ceilometer(self, c): + self._ceilometer = c + + def transform_vm_cpu(self, vm_load, host_vcpus): + """This method transforms vm cpu utilization to overall host cpu utilization. + + :param vm_load: dict that contains vm uuid and utilization info. + :param host_vcpus: int + :return: float value + """ + return vm_load['cpu_util'] * (vm_load['vcpus'] / float(host_vcpus)) + + @MEMOIZE + def get_vm_load(self, vm_uuid, current_model): + """Gathering vm load through ceilometer statistic. + + :param vm_uuid: vm for which statistic is gathered. + :param current_model: the cluster model + :return: dict + """ + LOG.debug(_LI('get_vm_load started')) + vm_vcpus = current_model.get_resource_from_id( + resource.ResourceType.cpu_cores).get_capacity( + current_model.get_vm_from_id(vm_uuid)) + vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus} + for meter in self.metrics: + avg_meter = self.ceilometer.statistic_aggregation( + resource_id=vm_uuid, + meter_name=meter, + period="120", + aggregate='min' + ) + if avg_meter is None: + raise exception.NoMetricValuesForVM(resource_id=vm_uuid, + metric_name=meter) + vm_load[meter] = avg_meter + return vm_load + + def normalize_hosts_load(self, hosts, current_model): + normalized_hosts = deepcopy(hosts) + for host in normalized_hosts: + if 'cpu_util' in normalized_hosts[host]: + normalized_hosts[host]['cpu_util'] /= float(100) + + if 'memory.resident' in normalized_hosts[host]: + h_memory = current_model.get_resource_from_id( + resource.ResourceType.memory).get_capacity( + current_model.get_hypervisor_from_id(host)) + normalized_hosts[host]['memory.resident'] /= float(h_memory) + + return normalized_hosts + + def get_hosts_load(self, current_model): + """Get load of every host by gathering vms load""" + hosts_load = {} + for hypervisor_id in current_model.get_all_hypervisors(): + hosts_load[hypervisor_id] = {} + host_vcpus = current_model.get_resource_from_id( + resource.ResourceType.cpu_cores).get_capacity( + current_model.get_hypervisor_from_id(hypervisor_id)) + hosts_load[hypervisor_id]['vcpus'] = host_vcpus + + for metric in self.metrics: + avg_meter = self.ceilometer.statistic_aggregation( + resource_id=hypervisor_id, + meter_name=vm_host_measures[metric], + period="60", + aggregate='avg' + ) + if avg_meter is None: + raise exception.NoSuchMetricForHost( + metric=vm_host_measures[metric], + host=hypervisor_id) + hosts_load[hypervisor_id][metric] = avg_meter + return hosts_load + + def get_sd(self, hosts, meter_name): + """Get standard deviation among hosts by specified meter""" + mean = 0 + variaton = 0 + for host_id in hosts: + mean += hosts[host_id][meter_name] + mean /= len(hosts) + for host_id in hosts: + variaton += (hosts[host_id][meter_name] - mean) ** 2 + variaton /= len(hosts) + sd = math.sqrt(variaton) + return sd + + def calculate_weighted_sd(self, sd_case): + """Calculate common standard deviation among meters on host""" + weighted_sd = 0 + for metric, value in zip(self.metrics, sd_case): + try: + weighted_sd += value * float(self.weights[metric + '_weight']) + except KeyError as exc: + LOG.exception(exc) + raise exception.WatcherException( + _("Incorrect mapping: could not find associated weight" + " for %s in weight dict.") % metric) + return weighted_sd + + def calculate_migration_case(self, hosts, vm_id, src_hp_id, dst_hp_id, + current_model): + """Calculate migration case + + Return list of standard deviation values, that appearing in case of + migration of vm from source host to destination host + :param hosts: hosts with their workload + :param vm_id: the virtual machine + :param src_hp_id: the source hypervisor id + :param dst_hp_id: the destination hypervisor id + :param current_model: the cluster model + :return: list of standard deviation values + """ + migration_case = [] + new_hosts = deepcopy(hosts) + vm_load = self.get_vm_load(vm_id, current_model) + d_host_vcpus = new_hosts[dst_hp_id]['vcpus'] + s_host_vcpus = new_hosts[src_hp_id]['vcpus'] + for metric in self.metrics: + if metric is 'cpu_util': + new_hosts[src_hp_id][metric] -= self.transform_vm_cpu( + vm_load, + s_host_vcpus) + new_hosts[dst_hp_id][metric] += self.transform_vm_cpu( + vm_load, + d_host_vcpus) + else: + new_hosts[src_hp_id][metric] -= vm_load[metric] + new_hosts[dst_hp_id][metric] += vm_load[metric] + normalized_hosts = self.normalize_hosts_load(new_hosts, current_model) + for metric in self.metrics: + migration_case.append(self.get_sd(normalized_hosts, metric)) + migration_case.append(new_hosts) + return migration_case + + def simulate_migrations(self, current_model, hosts): + """Make sorted list of pairs vm:dst_host""" + def yield_hypervisors(hypervisors): + ct = CONF['watcher_strategies.workload_stabilization'].retry_count + if self.host_choice == 'cycle': + for i in itertools.cycle(hypervisors): + yield [i] + if self.host_choice == 'retry': + while True: + yield random.sample(hypervisors, ct) + if self.host_choice == 'fullsearch': + while True: + yield hypervisors + + vm_host_map = [] + for source_hp_id in current_model.get_all_hypervisors(): + hypervisors = list(current_model.get_all_hypervisors()) + hypervisors.remove(source_hp_id) + hypervisor_list = yield_hypervisors(hypervisors) + vms_id = current_model.get_mapping(). \ + get_node_vms_from_id(source_hp_id) + for vm_id in vms_id: + min_sd_case = {'value': len(self.metrics)} + vm = current_model.get_vm_from_id(vm_id) + if vm.state not in [vm_state.VMState.ACTIVE.value, + vm_state.VMState.PAUSED.value]: + continue + for dst_hp_id in next(hypervisor_list): + sd_case = self.calculate_migration_case(hosts, vm_id, + source_hp_id, + dst_hp_id, + current_model) + + weighted_sd = self.calculate_weighted_sd(sd_case[:-1]) + + if weighted_sd < min_sd_case['value']: + min_sd_case = {'host': dst_hp_id, 'value': weighted_sd, + 's_host': source_hp_id, 'vm': vm_id} + vm_host_map.append(min_sd_case) + break + return sorted(vm_host_map, key=lambda x: x['value']) + + def check_threshold(self, current_model): + """Check if cluster is needed in balancing""" + hosts_load = self.get_hosts_load(current_model) + normalized_load = self.normalize_hosts_load(hosts_load, current_model) + for metric in self.metrics: + metric_sd = self.get_sd(normalized_load, metric) + if metric_sd > float(self.thresholds[metric]): + return self.simulate_migrations(current_model, hosts_load) + + def add_migration(self, + resource_id, + migration_type, + src_hypervisor, + dst_hypervisor): + parameters = {'migration_type': migration_type, + 'src_hypervisor': src_hypervisor, + 'dst_hypervisor': dst_hypervisor} + self.solution.add_action(action_type=self.MIGRATION, + resource_id=resource_id, + input_parameters=parameters) + + def create_migration_vm(self, current_model, mig_vm, mig_src_hypervisor, + mig_dst_hypervisor): + """Create migration VM """ + if current_model.get_mapping().migrate_vm( + mig_vm, mig_src_hypervisor, mig_dst_hypervisor): + self.add_migration(mig_vm.uuid, 'live', + mig_src_hypervisor.uuid, + mig_dst_hypervisor.uuid) + + def migrate(self, current_model, vm_uuid, src_host, dst_host): + mig_vm = current_model.get_vm_from_id(vm_uuid) + mig_src_hypervisor = current_model.get_hypervisor_from_id(src_host) + mig_dst_hypervisor = current_model.get_hypervisor_from_id(dst_host) + self.create_migration_vm(current_model, mig_vm, mig_src_hypervisor, + mig_dst_hypervisor) + + def fill_solution(self, current_model): + self.solution.model = current_model + self.solution.efficacy = 100 + return self.solution + + def execute(self, orign_model): + LOG.info(_LI("Initializing Workload Stabilization")) + current_model = orign_model + + if orign_model is None: + raise exception.ClusterStateNotDefined() + + migration = self.check_threshold(current_model) + if migration: + hosts_load = self.get_hosts_load(current_model) + min_sd = 1 + balanced = False + for vm_host in migration: + dst_hp_disk = current_model.get_resource_from_id( + resource.ResourceType.disk).get_capacity( + current_model.get_hypervisor_from_id(vm_host['host'])) + vm_disk = current_model.get_resource_from_id( + resource.ResourceType.disk).get_capacity( + current_model.get_vm_from_id(vm_host['vm'])) + if vm_disk > dst_hp_disk: + continue + vm_load = self.calculate_migration_case(hosts_load, + vm_host['vm'], + vm_host['s_host'], + vm_host['host'], + current_model) + weighted_sd = self.calculate_weighted_sd(vm_load[:-1]) + if weighted_sd < min_sd: + min_sd = weighted_sd + hosts_load = vm_load[-1] + self.migrate(current_model, vm_host['vm'], + vm_host['s_host'], vm_host['host']) + + for metric, value in zip(self.metrics, vm_load[:-1]): + if value < float(self.thresholds[metric]): + balanced = True + break + if balanced: + break + return self.fill_solution(current_model) diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py b/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py index 331400592..74379cf27 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py @@ -30,10 +30,16 @@ class FakerMetricsCollector(object): def mock_get_statistics(self, resource_id, meter_name, period, aggregate='avg'): result = 0 - if meter_name == "compute.node.cpu.percent": + if meter_name == "hardware.cpu.util": result = self.get_usage_node_cpu(resource_id) + elif meter_name == "compute.node.cpu.percent": + result = self.get_usage_node_cpu(resource_id) + elif meter_name == "hardware.memory.used": + result = self.get_usage_node_ram(resource_id) elif meter_name == "cpu_util": result = self.get_average_usage_vm_cpu(resource_id) + elif meter_name == "memory.resident": + result = self.get_average_usage_vm_memory(resource_id) elif meter_name == "hardware.ipmi.node.outlet_temperature": result = self.get_average_outlet_temperature(resource_id) return result @@ -49,6 +55,20 @@ class FakerMetricsCollector(object): return mock[str(uuid)] + def get_usage_node_ram(self, uuid): + mock = {} + mock['Node_0'] = 7 + mock['Node_1'] = 5 + mock['Node_2'] = 29 + mock['Node_3'] = 8 + mock['Node_4'] = 4 + + if uuid not in mock.keys(): + # mock[uuid] = random.randint(1, 4) + mock[uuid] = 8 + + return float(mock[str(uuid)]) + def get_usage_node_cpu(self, uuid): """The last VM CPU usage values to average @@ -77,6 +97,12 @@ class FakerMetricsCollector(object): # node 4 mock['VM_7_hostname_7'] = 4 + mock['Node_0'] = 7 + mock['Node_1'] = 5 + mock['Node_2'] = 10 + mock['Node_3'] = 4 + mock['Node_4'] = 2 + if uuid not in mock.keys(): # mock[uuid] = random.randint(1, 4) mock[uuid] = 8 diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py new file mode 100644 index 000000000..d1d62574f --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py @@ -0,0 +1,168 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Servionica LLC +# +# Authors: Alexander Chadin +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import mock + +from watcher.decision_engine.strategy import strategies +from watcher.tests import base +from watcher.tests.decision_engine.strategy.strategies \ + import faker_cluster_state +from watcher.tests.decision_engine.strategy.strategies \ + import faker_metrics_collector + + +class TestWorkloadStabilization(base.BaseTestCase): + # fake metrics + fake_metrics = faker_metrics_collector.FakerMetricsCollector() + + # fake cluster + fake_cluster = faker_cluster_state.FakerModelCollector() + + hosts_load_assert = {'Node_0': + {'cpu_util': 7.0, 'memory.resident': 7.0, + 'vcpus': 40}, + 'Node_1': + {'cpu_util': 5.0, 'memory.resident': 5, + 'vcpus': 40}, + 'Node_2': + {'cpu_util': 10.0, 'memory.resident': 29, + 'vcpus': 40}, + 'Node_3': + {'cpu_util': 4.0, 'memory.resident': 8, + 'vcpus': 40}, + 'Node_4': + {'cpu_util': 2.0, 'memory.resident': 4, + 'vcpus': 40}} + + def test_get_vm_load(self): + model = self.fake_cluster.generate_scenario_1() + sd = strategies.WorkloadStabilization() + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + vm_0_dict = {'uuid': 'VM_0', 'vcpus': 10, + 'cpu_util': 7, 'memory.resident': 2} + self.assertEqual(sd.get_vm_load("VM_0", model), vm_0_dict) + + def test_normalize_hosts_load(self): + model = self.fake_cluster.generate_scenario_1() + sd = strategies.WorkloadStabilization() + fake_hosts = {'Node_0': {'cpu_util': 7.0, 'memory.resident': 7}, + 'Node_1': {'cpu_util': 5.0, 'memory.resident': 5}} + normalized_hosts = {'Node_0': + {'cpu_util': 0.07, + 'memory.resident': 0.05303030303030303}, + 'Node_1': + {'cpu_util': 0.05, + 'memory.resident': 0.03787878787878788}} + self.assertEqual(sd.normalize_hosts_load(fake_hosts, model), + normalized_hosts) + + def test_get_hosts_load(self): + sd = strategies.WorkloadStabilization() + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.assertEqual( + sd.get_hosts_load(self.fake_cluster.generate_scenario_1()), + self.hosts_load_assert) + + def test_get_sd(self): + sd = strategies.WorkloadStabilization() + test_cpu_sd = 2.7 + test_ram_sd = 9.3 + self.assertEqual( + round(sd.get_sd(self.hosts_load_assert, 'cpu_util'), 1), + test_cpu_sd) + self.assertEqual( + round(sd.get_sd(self.hosts_load_assert, 'memory.resident'), 1), + test_ram_sd) + + def test_calculate_weighted_sd(self): + sd = strategies.WorkloadStabilization() + sd_case = [0.5, 0.75] + self.assertEqual(sd.calculate_weighted_sd(sd_case), 1.25) + + def test_calculate_migration_case(self): + model = self.fake_cluster.generate_scenario_1() + sd = strategies.WorkloadStabilization() + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.assertEqual(sd.calculate_migration_case( + self.hosts_load_assert, "VM_5", "Node_2", "Node_1", + model)[-1]["Node_1"], + {'cpu_util': 7.5, 'memory.resident': 21, 'vcpus': 40}) + + def test_simulate_migrations(self): + sd = strategies.WorkloadStabilization() + sd.host_choice = 'retry' + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.assertEqual( + len(sd.simulate_migrations(self.fake_cluster.generate_scenario_1(), + self.hosts_load_assert)), + 8) + + def test_check_threshold(self): + sd = strategies.WorkloadStabilization() + sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + sd.simulate_migrations = mock.Mock(return_value=True) + self.assertTrue( + sd.check_threshold(self.fake_cluster.generate_scenario_1())) + + def test_execute_one_migration(self): + sd = strategies.WorkloadStabilization() + model = self.fake_cluster.generate_scenario_1() + sd.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4', + 's_host': 'Node_2', + 'host': 'Node_1'}]) + with mock.patch.object(sd, 'migrate') as mock_migration: + sd.execute(model) + mock_migration.assert_called_once_with(model, 'VM_4', 'Node_2', + 'Node_1') + + def test_execute_multiply_migrations(self): + sd = strategies.WorkloadStabilization() + model = self.fake_cluster.generate_scenario_1() + sd.thresholds = {'cpu_util': 0.022, 'memory.resident': 0.0001} + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + sd.simulate_migrations = mock.Mock(return_value=[{'vm': 'VM_4', + 's_host': 'Node_2', + 'host': 'Node_1'}, + {'vm': 'VM_3', + 's_host': 'Node_2', + 'host': 'Node_3'}]) + with mock.patch.object(sd, 'migrate') as mock_migrate: + sd.execute(model) + self.assertEqual(mock_migrate.call_count, 2) + + def test_execute_nothing_to_migrate(self): + sd = strategies.WorkloadStabilization() + model = self.fake_cluster.generate_scenario_1() + sd.thresholds = {'cpu_util': 0.042, 'memory.resident': 0.0001} + sd.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + sd.simulate_migrations = mock.Mock(return_value=False) + with mock.patch.object(sd, 'migrate') as mock_migrate: + sd.execute(model) + mock_migrate.assert_not_called()