Add gnocchi support in workload_balance strategy

This patch adds gnocchi support in workload_balance strategy
and adds unit tests corresponding to that change.

Change-Id: I9bc56c7b91b5c3fd0cfe97d75c3bace50ab22532
Partiallly-Implements: bp gnocchi-watcher
This commit is contained in:
Santhosh Fernandes
2017-03-28 13:32:30 +05:30
parent 54f0758fc3
commit e549e43e9e
3 changed files with 144 additions and 20 deletions

View File

@@ -46,12 +46,15 @@ hosts nodes.
algorithm with `CONTINUOUS` audits. algorithm with `CONTINUOUS` audits.
""" """
import datetime
from oslo_config import cfg
from oslo_log import log from oslo_log import log
from watcher._i18n import _ from watcher._i18n import _
from watcher.common import exception as wexc from watcher.common import exception as wexc
from watcher.datasource import ceilometer as ceil from watcher.datasource import ceilometer as ceil
from watcher.datasource import gnocchi as gnoc
from watcher.decision_engine.model import element from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base from watcher.decision_engine.strategy.strategies import base
@@ -104,6 +107,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
# reaches threshold # reaches threshold
self._meter = self.METER_NAME self._meter = self.METER_NAME
self._ceilometer = None self._ceilometer = None
self._gnocchi = None
@property @property
def ceilometer(self): def ceilometer(self):
@@ -115,6 +119,16 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
def ceilometer(self, c): def ceilometer(self, c):
self._ceilometer = c self._ceilometer = c
@property
def gnocchi(self):
if self._gnocchi is None:
self._gnocchi = gnoc.GnocchiHelper(osc=self.osc)
return self._gnocchi
@gnocchi.setter
def gnocchi(self, gnocchi):
self._gnocchi = gnocchi
@classmethod @classmethod
def get_name(cls): def get_name(cls):
return "workload_balance" return "workload_balance"
@@ -127,6 +141,10 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
def get_translatable_display_name(cls): def get_translatable_display_name(cls):
return "Workload Balance Migration Strategy" return "Workload Balance Migration Strategy"
@property
def granularity(self):
return self.input_parameters.get('granularity', 300)
@classmethod @classmethod
def get_schema(cls): def get_schema(cls):
# Mandatory default setting for each element # Mandatory default setting for each element
@@ -142,9 +160,25 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
"type": "number", "type": "number",
"default": 300 "default": 300
}, },
"granularity": {
"description": "The time between two measures in an "
"aggregated timeseries of a metric.",
"type": "number",
"default": 300
},
}, },
} }
@classmethod
def get_config_opts(cls):
return [
cfg.StrOpt(
"datasource",
help="Data source to use in order to query the needed metrics",
default="ceilometer",
choices=["ceilometer", "gnocchi"])
]
def calculate_used_resource(self, node): def calculate_used_resource(self, node):
"""Calculate the used vcpus, memory and disk based on VM flavors""" """Calculate the used vcpus, memory and disk based on VM flavors"""
instances = self.compute_model.get_node_instances(node) instances = self.compute_model.get_node_instances(node)
@@ -251,15 +285,30 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
instances = self.compute_model.get_node_instances(node) instances = self.compute_model.get_node_instances(node)
node_workload = 0.0 node_workload = 0.0
for instance in instances: for instance in instances:
cpu_util = None
try: try:
cpu_util = self.ceilometer.statistic_aggregation( if self.config.datasource == "ceilometer":
resource_id=instance.uuid, cpu_util = self.ceilometer.statistic_aggregation(
meter_name=self._meter, resource_id=instance.uuid,
period=self._period, meter_name=self._meter,
aggregate='avg') period=self._period,
aggregate='avg')
elif self.config.datasource == "gnocchi":
stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(
seconds=int(self._period))
cpu_util = self.gnocchi.statistic_aggregation(
resource_id=instance.uuid,
metric=self._meter,
granularity=self.granularity,
start_time=start_time,
stop_time=stop_time,
aggregation='mean'
)
except Exception as exc: except Exception as exc:
LOG.exception(exc) LOG.exception(exc)
LOG.error("Can not get cpu_util from Ceilometer") LOG.error("Can not get cpu_util from %s",
self.config.datasource)
continue continue
if cpu_util is None: if cpu_util is None:
LOG.debug("Instance (%s): cpu_util is None", instance.uuid) LOG.debug("Instance (%s): cpu_util is None", instance.uuid)

View File

@@ -45,6 +45,13 @@ class FakeGnocchiMetrics(object):
result = self.get_average_power(resource_id) result = self.get_average_power(resource_id)
return result return result
def mock_get_statistics_wb(self, resource_id, metric, granularity,
start_time, stop_time, aggregation='mean'):
result = 0.0
if metric == "cpu_util":
result = self.get_average_usage_instance_cpu_wb(resource_id)
return result
@staticmethod @staticmethod
def get_average_outlet_temperature(uuid): def get_average_outlet_temperature(uuid):
"""The average outlet temperature for host""" """The average outlet temperature for host"""
@@ -106,8 +113,8 @@ class FakeGnocchiMetrics(object):
def get_usage_node_cpu(uuid): def get_usage_node_cpu(uuid):
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid:00 :param uuid: instance UUID
:return: :return: float value
""" """
# Normalize # Normalize
mock = {} mock = {}
@@ -142,8 +149,8 @@ class FakeGnocchiMetrics(object):
def get_average_usage_instance_cpu(uuid): def get_average_usage_instance_cpu(uuid):
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid:00 :param uuid: instance UUID
:return: :return: int value
""" """
# Normalize # Normalize
@@ -214,3 +221,24 @@ class FakeGnocchiMetrics(object):
mock[uuid] = 4 mock[uuid] = 4
return mock[str(uuid)] return mock[str(uuid)]
@staticmethod
def get_average_usage_instance_cpu_wb(uuid):
"""The last VM CPU usage values to average
:param uuid: instance UUID
:return: float value
"""
# query influxdb stream
# compute in stream
# Normalize
mock = {}
# node 0
mock['INSTANCE_1'] = 80
mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 50
# node 1
mock['INSTANCE_3'] = 20
mock['INSTANCE_4'] = 10
return float(mock[str(uuid)])

View File

@@ -17,6 +17,7 @@
# limitations under the License. # limitations under the License.
# #
import collections import collections
import datetime
import mock import mock
from watcher.applier.loading import default from watcher.applier.loading import default
@@ -27,14 +28,24 @@ from watcher.decision_engine.strategy import strategies
from watcher.tests import base from watcher.tests import base
from watcher.tests.decision_engine.model import ceilometer_metrics from watcher.tests.decision_engine.model import ceilometer_metrics
from watcher.tests.decision_engine.model import faker_cluster_state from watcher.tests.decision_engine.model import faker_cluster_state
from watcher.tests.decision_engine.model import gnocchi_metrics
class TestWorkloadBalance(base.TestCase): class TestWorkloadBalance(base.TestCase):
scenarios = [
("Ceilometer",
{"datasource": "ceilometer",
"fake_datasource_cls": ceilometer_metrics.FakeCeilometerMetrics}),
("Gnocchi",
{"datasource": "gnocchi",
"fake_datasource_cls": gnocchi_metrics.FakeGnocchiMetrics}),
]
def setUp(self): def setUp(self):
super(TestWorkloadBalance, self).setUp() super(TestWorkloadBalance, self).setUp()
# fake metrics # fake metrics
self.fake_metrics = ceilometer_metrics.FakeCeilometerMetrics() self.fake_metrics = self.fake_datasource_cls()
# fake cluster # fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector() self.fake_cluster = faker_cluster_state.FakerModelCollector()
@@ -44,11 +55,11 @@ class TestWorkloadBalance(base.TestCase):
self.m_model = p_model.start() self.m_model = p_model.start()
self.addCleanup(p_model.stop) self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object( p_datasource = mock.patch.object(
strategies.WorkloadBalance, "ceilometer", strategies.WorkloadBalance, self.datasource,
new_callable=mock.PropertyMock) new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start() self.m_datasource = p_datasource.start()
self.addCleanup(p_ceilometer.stop) self.addCleanup(p_datasource.stop)
p_audit_scope = mock.patch.object( p_audit_scope = mock.patch.object(
strategies.WorkloadBalance, "audit_scope", strategies.WorkloadBalance, "audit_scope",
@@ -58,11 +69,10 @@ class TestWorkloadBalance(base.TestCase):
self.addCleanup(p_audit_scope.stop) self.addCleanup(p_audit_scope.stop)
self.m_audit_scope.return_value = mock.Mock() self.m_audit_scope.return_value = mock.Mock()
self.m_datasource.return_value = mock.Mock(
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
self.strategy = strategies.WorkloadBalance(config=mock.Mock()) self.strategy = strategies.WorkloadBalance(
config=mock.Mock(datasource=self.datasource))
self.strategy.input_parameters = utils.Struct() self.strategy.input_parameters = utils.Struct()
self.strategy.input_parameters.update({'threshold': 25.0, self.strategy.input_parameters.update({'threshold': 25.0,
'period': 300}) 'period': 300})
@@ -110,7 +120,7 @@ class TestWorkloadBalance(base.TestCase):
def test_filter_destination_hosts(self): def test_filter_destination_hosts(self):
model = self.fake_cluster.generate_scenario_6_with_2_nodes() model = self.fake_cluster.generate_scenario_6_with_2_nodes()
self.m_model.return_value = model self.m_model.return_value = model
self.strategy.ceilometer = mock.MagicMock( self.strategy.datasource = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
n1, n2, avg, w_map = self.strategy.group_hosts_by_cpu_util() n1, n2, avg, w_map = self.strategy.group_hosts_by_cpu_util()
instance_to_mig = self.strategy.choose_instance_to_migrate( instance_to_mig = self.strategy.choose_instance_to_migrate(
@@ -168,3 +178,40 @@ class TestWorkloadBalance(base.TestCase):
loaded_action = loader.load(action['action_type']) loaded_action = loader.load(action['action_type'])
loaded_action.input_parameters = action['input_parameters'] loaded_action.input_parameters = action['input_parameters']
loaded_action.validate_parameters() loaded_action.validate_parameters()
def test_periods(self):
model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model
p_ceilometer = mock.patch.object(
strategies.WorkloadBalance, "ceilometer")
m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
p_gnocchi = mock.patch.object(strategies.WorkloadBalance, "gnocchi")
m_gnocchi = p_gnocchi.start()
self.addCleanup(p_gnocchi.stop)
datetime_patcher = mock.patch.object(
datetime, 'datetime',
mock.Mock(wraps=datetime.datetime)
)
mocked_datetime = datetime_patcher.start()
mocked_datetime.utcnow.return_value = datetime.datetime(
2017, 3, 19, 18, 53, 11, 657417)
self.addCleanup(datetime_patcher.stop)
m_ceilometer.statistic_aggregation = mock.Mock(
side_effect=self.fake_metrics.mock_get_statistics_wb)
m_gnocchi.statistic_aggregation = mock.Mock(
side_effect=self.fake_metrics.mock_get_statistics_wb)
instance0 = model.get_instance_by_uuid("INSTANCE_0")
self.strategy.group_hosts_by_cpu_util()
if self.strategy.config.datasource == "ceilometer":
m_ceilometer.statistic_aggregation.assert_any_call(
aggregate='avg', meter_name='cpu_util',
period=300, resource_id=instance0.uuid)
elif self.strategy.config.datasource == "gnocchi":
stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(
seconds=int('300'))
m_gnocchi.statistic_aggregation.assert_called_with(
resource_id=mock.ANY, metric='cpu_util',
granularity=300, start_time=start_time, stop_time=stop_time,
aggregation='mean')