diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index bf55fbafa..0be31be71 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -17,7 +17,7 @@ # limitations under the License. # -from copy import deepcopy +import copy import itertools import math import random @@ -34,40 +34,8 @@ from watcher.decision_engine.model import element from watcher.decision_engine.strategy.strategies import base LOG = log.getLogger(__name__) - -metrics = ['cpu_util', 'memory.resident'] -thresholds_dict = {'cpu_util': 0.2, 'memory.resident': 0.2} -weights_dict = {'cpu_util_weight': 1.0, 'memory.resident_weight': 1.0} -instance_host_measures = {'cpu_util': 'hardware.cpu.util', - 'memory.resident': 'hardware.memory.used'} - -ws_opts = [ - cfg.ListOpt('metrics', - default=metrics, - required=True, - help='Metrics used as rates of cluster loads.'), - cfg.DictOpt('thresholds', - default=thresholds_dict, - help=''), - cfg.DictOpt('weights', - default=weights_dict, - help='These weights used to calculate ' - 'common standard deviation. Name of weight ' - 'contains meter name and _weight suffix.'), - cfg.StrOpt('host_choice', - default='retry', - required=True, - help="Method of host's choice."), - cfg.IntOpt('retry_count', - default=1, - required=True, - help='Count of random returned hosts.'), -] - CONF = cfg.CONF -CONF.register_opts(ws_opts, 'watcher_strategies.workload_stabilization') - def _set_memoize(conf): oslo_cache.configure(conf) @@ -111,14 +79,12 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): super(WorkloadStabilization, self).__init__(config, osc) self._ceilometer = None self._nova = None - self.weights = CONF['watcher_strategies.workload_stabilization']\ - .weights - self.metrics = CONF['watcher_strategies.workload_stabilization']\ - .metrics - self.thresholds = CONF['watcher_strategies.workload_stabilization']\ - .thresholds - self.host_choice = CONF['watcher_strategies.workload_stabilization']\ - .host_choice + self.weights = None + self.metrics = None + self.thresholds = None + self.host_choice = None + self.instance_metrics = None + self.retry_count = None @classmethod def get_name(cls): @@ -132,6 +98,55 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): def get_translatable_display_name(cls): return "Workload stabilization" + @classmethod + def get_schema(cls): + return { + "properties": { + "metrics": { + "description": "Metrics used as rates of cluster loads.", + "type": "array", + "default": ["cpu_util", "memory.resident"] + }, + "thresholds": { + "description": "Dict where key is a metric and value " + "is a trigger value.", + "type": "object", + "default": {"cpu_util": 0.2, "memory.resident": 0.2} + }, + "weights": { + "description": "These weights used to calculate " + "common standard deviation. Name of weight" + " contains meter name and _weight suffix.", + "type": "object", + "default": {"cpu_util_weight": 1.0, + "memory.resident_weight": 1.0} + }, + "instance_metrics": { + "description": "Mapping to get hardware statistics using" + " instance metrics", + "type": "object", + "default": {"cpu_util": "hardware.cpu.util", + "memory.resident": "hardware.memory.used"} + }, + "host_choice": { + "description": "Method of host's choice. There are cycle," + " retry and fullsearch methods. " + "Cycle will iterate hosts in cycle. " + "Retry will get some hosts random " + "(count defined in retry_count option). " + "Fullsearch will return each host " + "from list.", + "type": "string", + "default": "retry" + }, + "retry_count": { + "description": "Count of random returned hosts", + "type": "number", + "default": 1 + } + } + } + @property def ceilometer(self): if self._ceilometer is None: @@ -190,7 +205,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): return instance_load def normalize_hosts_load(self, hosts): - normalized_hosts = deepcopy(hosts) + normalized_hosts = copy.deepcopy(hosts) for host in normalized_hosts: if 'memory.resident' in normalized_hosts[host]: h_memory = self.compute_model.get_resource_from_id( @@ -213,13 +228,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): for metric in self.metrics: avg_meter = self.ceilometer.statistic_aggregation( resource_id=node_id, - meter_name=instance_host_measures[metric], + meter_name=self.instance_metrics[metric], period="60", aggregate='avg' ) if avg_meter is None: raise exception.NoSuchMetricForHost( - metric=instance_host_measures[metric], + metric=self.instance_metrics[metric], host=node_id) hosts_load[node_id][metric] = avg_meter return hosts_load @@ -263,7 +278,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): :return: list of standard deviation values """ migration_case = [] - new_hosts = deepcopy(hosts) + new_hosts = copy.deepcopy(hosts) instance_load = self.get_instance_load(instance_id) d_host_vcpus = new_hosts[dst_node_id]['vcpus'] s_host_vcpus = new_hosts[src_node_id]['vcpus'] @@ -287,22 +302,22 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): def simulate_migrations(self, hosts): """Make sorted list of pairs instance:dst_host""" def yield_nodes(nodes): - ct = CONF['watcher_strategies.workload_stabilization'].retry_count if self.host_choice == 'cycle': for i in itertools.cycle(nodes): yield [i] if self.host_choice == 'retry': while True: - yield random.sample(nodes, ct) + yield random.sample(nodes, self.retry_count) if self.host_choice == 'fullsearch': while True: yield nodes instance_host_map = [] - for source_hp_id in self.compute_model.get_all_compute_nodes(): - nodes = list(self.compute_model.get_all_compute_nodes()) - nodes.remove(source_hp_id) - node_list = yield_nodes(nodes) + nodes = list(self.compute_model.get_all_compute_nodes()) + for source_hp_id in nodes: + c_nodes = copy.copy(nodes) + c_nodes.remove(source_hp_id) + node_list = yield_nodes(c_nodes) instances_id = self.compute_model.get_mapping(). \ get_node_instances_from_id(source_hp_id) for instance_id in instances_id: @@ -323,7 +338,6 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): 'host': dst_node_id, 'value': weighted_sd, 's_host': source_hp_id, 'instance': instance_id} instance_host_map.append(min_sd_case) - break return sorted(instance_host_map, key=lambda x: x['value']) def check_threshold(self): @@ -375,6 +389,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): if not self.compute_model: raise exception.ClusterStateNotDefined() + self.weights = self.input_parameters.weights + self.metrics = self.input_parameters.metrics + self.thresholds = self.input_parameters.thresholds + self.host_choice = self.input_parameters.host_choice + self.instance_metrics = self.input_parameters.instance_metrics + self.retry_count = self.input_parameters.retry_count + def do_execute(self): migration = self.check_threshold() if migration: diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py index ef1cab69a..130575624 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py @@ -19,6 +19,7 @@ import mock +from watcher.common import utils from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base @@ -62,6 +63,26 @@ class TestWorkloadStabilization(base.TestCase): self.m_ceilometer.return_value = mock.Mock( statistic_aggregation=self.fake_metrics.mock_get_statistics) self.strategy = strategies.WorkloadStabilization(config=mock.Mock()) + self.strategy.input_parameters = utils.Struct() + self.strategy.input_parameters.update( + {'metrics': ["cpu_util", "memory.resident"], + 'thresholds': {"cpu_util": 0.2, "memory.resident": 0.2}, + 'weights': {"cpu_util_weight": 1.0, + "memory.resident_weight": 1.0}, + 'instance_metrics': + {"cpu_util": "hardware.cpu.util", + "memory.resident": "hardware.memory.used"}, + 'host_choice': 'retry', + 'retry_count': 1}) + self.strategy.metrics = ["cpu_util", "memory.resident"] + self.strategy.thresholds = {"cpu_util": 0.2, "memory.resident": 0.2} + self.strategy.weights = {"cpu_util_weight": 1.0, + "memory.resident_weight": 1.0} + self.strategy.instance_metrics = {"cpu_util": "hardware.cpu.util", + "memory.resident": + "hardware.memory.used"} + self.strategy.host_choice = 'retry' + self.strategy.retry_count = 1 def test_get_instance_load(self): self.m_model.return_value = self.fake_cluster.generate_scenario_1() @@ -138,7 +159,7 @@ class TestWorkloadStabilization(base.TestCase): 'host': 'Node_1'}] ) with mock.patch.object(self.strategy, 'migrate') as mock_migration: - self.strategy.execute() + self.strategy.do_execute() mock_migration.assert_called_once_with( 'INSTANCE_4', 'Node_2', 'Node_1') @@ -154,7 +175,7 @@ class TestWorkloadStabilization(base.TestCase): 'host': 'Node_3'}] ) with mock.patch.object(self.strategy, 'migrate') as mock_migrate: - self.strategy.execute() + self.strategy.do_execute() self.assertEqual(mock_migrate.call_count, 1) def test_execute_nothing_to_migrate(self):