Use parameters instead of config for workload stabilization
This patch set allows to use audit parameters for workload-stabilization strategy and makes some little refactoring. Closes-Bug: #1620604 Change-Id: I60e34611d4dd001beed31666fd11d2ab11c1723c
This commit is contained in:
@@ -17,7 +17,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from copy import deepcopy
|
||||
import copy
|
||||
import itertools
|
||||
import math
|
||||
import random
|
||||
@@ -34,40 +34,8 @@ from watcher.decision_engine.model import element
|
||||
from watcher.decision_engine.strategy.strategies import base
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
metrics = ['cpu_util', 'memory.resident']
|
||||
thresholds_dict = {'cpu_util': 0.2, 'memory.resident': 0.2}
|
||||
weights_dict = {'cpu_util_weight': 1.0, 'memory.resident_weight': 1.0}
|
||||
instance_host_measures = {'cpu_util': 'hardware.cpu.util',
|
||||
'memory.resident': 'hardware.memory.used'}
|
||||
|
||||
ws_opts = [
|
||||
cfg.ListOpt('metrics',
|
||||
default=metrics,
|
||||
required=True,
|
||||
help='Metrics used as rates of cluster loads.'),
|
||||
cfg.DictOpt('thresholds',
|
||||
default=thresholds_dict,
|
||||
help=''),
|
||||
cfg.DictOpt('weights',
|
||||
default=weights_dict,
|
||||
help='These weights used to calculate '
|
||||
'common standard deviation. Name of weight '
|
||||
'contains meter name and _weight suffix.'),
|
||||
cfg.StrOpt('host_choice',
|
||||
default='retry',
|
||||
required=True,
|
||||
help="Method of host's choice."),
|
||||
cfg.IntOpt('retry_count',
|
||||
default=1,
|
||||
required=True,
|
||||
help='Count of random returned hosts.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
CONF.register_opts(ws_opts, 'watcher_strategies.workload_stabilization')
|
||||
|
||||
|
||||
def _set_memoize(conf):
|
||||
oslo_cache.configure(conf)
|
||||
@@ -111,14 +79,12 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
super(WorkloadStabilization, self).__init__(config, osc)
|
||||
self._ceilometer = None
|
||||
self._nova = None
|
||||
self.weights = CONF['watcher_strategies.workload_stabilization']\
|
||||
.weights
|
||||
self.metrics = CONF['watcher_strategies.workload_stabilization']\
|
||||
.metrics
|
||||
self.thresholds = CONF['watcher_strategies.workload_stabilization']\
|
||||
.thresholds
|
||||
self.host_choice = CONF['watcher_strategies.workload_stabilization']\
|
||||
.host_choice
|
||||
self.weights = None
|
||||
self.metrics = None
|
||||
self.thresholds = None
|
||||
self.host_choice = None
|
||||
self.instance_metrics = None
|
||||
self.retry_count = None
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
@@ -132,6 +98,55 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
def get_translatable_display_name(cls):
|
||||
return "Workload stabilization"
|
||||
|
||||
@classmethod
|
||||
def get_schema(cls):
|
||||
return {
|
||||
"properties": {
|
||||
"metrics": {
|
||||
"description": "Metrics used as rates of cluster loads.",
|
||||
"type": "array",
|
||||
"default": ["cpu_util", "memory.resident"]
|
||||
},
|
||||
"thresholds": {
|
||||
"description": "Dict where key is a metric and value "
|
||||
"is a trigger value.",
|
||||
"type": "object",
|
||||
"default": {"cpu_util": 0.2, "memory.resident": 0.2}
|
||||
},
|
||||
"weights": {
|
||||
"description": "These weights used to calculate "
|
||||
"common standard deviation. Name of weight"
|
||||
" contains meter name and _weight suffix.",
|
||||
"type": "object",
|
||||
"default": {"cpu_util_weight": 1.0,
|
||||
"memory.resident_weight": 1.0}
|
||||
},
|
||||
"instance_metrics": {
|
||||
"description": "Mapping to get hardware statistics using"
|
||||
" instance metrics",
|
||||
"type": "object",
|
||||
"default": {"cpu_util": "hardware.cpu.util",
|
||||
"memory.resident": "hardware.memory.used"}
|
||||
},
|
||||
"host_choice": {
|
||||
"description": "Method of host's choice. There are cycle,"
|
||||
" retry and fullsearch methods. "
|
||||
"Cycle will iterate hosts in cycle. "
|
||||
"Retry will get some hosts random "
|
||||
"(count defined in retry_count option). "
|
||||
"Fullsearch will return each host "
|
||||
"from list.",
|
||||
"type": "string",
|
||||
"default": "retry"
|
||||
},
|
||||
"retry_count": {
|
||||
"description": "Count of random returned hosts",
|
||||
"type": "number",
|
||||
"default": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@property
|
||||
def ceilometer(self):
|
||||
if self._ceilometer is None:
|
||||
@@ -190,7 +205,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
return instance_load
|
||||
|
||||
def normalize_hosts_load(self, hosts):
|
||||
normalized_hosts = deepcopy(hosts)
|
||||
normalized_hosts = copy.deepcopy(hosts)
|
||||
for host in normalized_hosts:
|
||||
if 'memory.resident' in normalized_hosts[host]:
|
||||
h_memory = self.compute_model.get_resource_from_id(
|
||||
@@ -213,13 +228,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
for metric in self.metrics:
|
||||
avg_meter = self.ceilometer.statistic_aggregation(
|
||||
resource_id=node_id,
|
||||
meter_name=instance_host_measures[metric],
|
||||
meter_name=self.instance_metrics[metric],
|
||||
period="60",
|
||||
aggregate='avg'
|
||||
)
|
||||
if avg_meter is None:
|
||||
raise exception.NoSuchMetricForHost(
|
||||
metric=instance_host_measures[metric],
|
||||
metric=self.instance_metrics[metric],
|
||||
host=node_id)
|
||||
hosts_load[node_id][metric] = avg_meter
|
||||
return hosts_load
|
||||
@@ -263,7 +278,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
:return: list of standard deviation values
|
||||
"""
|
||||
migration_case = []
|
||||
new_hosts = deepcopy(hosts)
|
||||
new_hosts = copy.deepcopy(hosts)
|
||||
instance_load = self.get_instance_load(instance_id)
|
||||
d_host_vcpus = new_hosts[dst_node_id]['vcpus']
|
||||
s_host_vcpus = new_hosts[src_node_id]['vcpus']
|
||||
@@ -287,22 +302,22 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
def simulate_migrations(self, hosts):
|
||||
"""Make sorted list of pairs instance:dst_host"""
|
||||
def yield_nodes(nodes):
|
||||
ct = CONF['watcher_strategies.workload_stabilization'].retry_count
|
||||
if self.host_choice == 'cycle':
|
||||
for i in itertools.cycle(nodes):
|
||||
yield [i]
|
||||
if self.host_choice == 'retry':
|
||||
while True:
|
||||
yield random.sample(nodes, ct)
|
||||
yield random.sample(nodes, self.retry_count)
|
||||
if self.host_choice == 'fullsearch':
|
||||
while True:
|
||||
yield nodes
|
||||
|
||||
instance_host_map = []
|
||||
for source_hp_id in self.compute_model.get_all_compute_nodes():
|
||||
nodes = list(self.compute_model.get_all_compute_nodes())
|
||||
nodes.remove(source_hp_id)
|
||||
node_list = yield_nodes(nodes)
|
||||
nodes = list(self.compute_model.get_all_compute_nodes())
|
||||
for source_hp_id in nodes:
|
||||
c_nodes = copy.copy(nodes)
|
||||
c_nodes.remove(source_hp_id)
|
||||
node_list = yield_nodes(c_nodes)
|
||||
instances_id = self.compute_model.get_mapping(). \
|
||||
get_node_instances_from_id(source_hp_id)
|
||||
for instance_id in instances_id:
|
||||
@@ -323,7 +338,6 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
'host': dst_node_id, 'value': weighted_sd,
|
||||
's_host': source_hp_id, 'instance': instance_id}
|
||||
instance_host_map.append(min_sd_case)
|
||||
break
|
||||
return sorted(instance_host_map, key=lambda x: x['value'])
|
||||
|
||||
def check_threshold(self):
|
||||
@@ -375,6 +389,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
if not self.compute_model:
|
||||
raise exception.ClusterStateNotDefined()
|
||||
|
||||
self.weights = self.input_parameters.weights
|
||||
self.metrics = self.input_parameters.metrics
|
||||
self.thresholds = self.input_parameters.thresholds
|
||||
self.host_choice = self.input_parameters.host_choice
|
||||
self.instance_metrics = self.input_parameters.instance_metrics
|
||||
self.retry_count = self.input_parameters.retry_count
|
||||
|
||||
def do_execute(self):
|
||||
migration = self.check_threshold()
|
||||
if migration:
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from watcher.common import utils
|
||||
from watcher.decision_engine.model import model_root
|
||||
from watcher.decision_engine.strategy import strategies
|
||||
from watcher.tests import base
|
||||
@@ -62,6 +63,26 @@ class TestWorkloadStabilization(base.TestCase):
|
||||
self.m_ceilometer.return_value = mock.Mock(
|
||||
statistic_aggregation=self.fake_metrics.mock_get_statistics)
|
||||
self.strategy = strategies.WorkloadStabilization(config=mock.Mock())
|
||||
self.strategy.input_parameters = utils.Struct()
|
||||
self.strategy.input_parameters.update(
|
||||
{'metrics': ["cpu_util", "memory.resident"],
|
||||
'thresholds': {"cpu_util": 0.2, "memory.resident": 0.2},
|
||||
'weights': {"cpu_util_weight": 1.0,
|
||||
"memory.resident_weight": 1.0},
|
||||
'instance_metrics':
|
||||
{"cpu_util": "hardware.cpu.util",
|
||||
"memory.resident": "hardware.memory.used"},
|
||||
'host_choice': 'retry',
|
||||
'retry_count': 1})
|
||||
self.strategy.metrics = ["cpu_util", "memory.resident"]
|
||||
self.strategy.thresholds = {"cpu_util": 0.2, "memory.resident": 0.2}
|
||||
self.strategy.weights = {"cpu_util_weight": 1.0,
|
||||
"memory.resident_weight": 1.0}
|
||||
self.strategy.instance_metrics = {"cpu_util": "hardware.cpu.util",
|
||||
"memory.resident":
|
||||
"hardware.memory.used"}
|
||||
self.strategy.host_choice = 'retry'
|
||||
self.strategy.retry_count = 1
|
||||
|
||||
def test_get_instance_load(self):
|
||||
self.m_model.return_value = self.fake_cluster.generate_scenario_1()
|
||||
@@ -138,7 +159,7 @@ class TestWorkloadStabilization(base.TestCase):
|
||||
'host': 'Node_1'}]
|
||||
)
|
||||
with mock.patch.object(self.strategy, 'migrate') as mock_migration:
|
||||
self.strategy.execute()
|
||||
self.strategy.do_execute()
|
||||
mock_migration.assert_called_once_with(
|
||||
'INSTANCE_4', 'Node_2', 'Node_1')
|
||||
|
||||
@@ -154,7 +175,7 @@ class TestWorkloadStabilization(base.TestCase):
|
||||
'host': 'Node_3'}]
|
||||
)
|
||||
with mock.patch.object(self.strategy, 'migrate') as mock_migrate:
|
||||
self.strategy.execute()
|
||||
self.strategy.do_execute()
|
||||
self.assertEqual(mock_migrate.call_count, 1)
|
||||
|
||||
def test_execute_nothing_to_migrate(self):
|
||||
|
||||
Reference in New Issue
Block a user