diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index 0f3067299..8107410ef 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -46,12 +46,15 @@ hosts nodes. algorithm with `CONTINUOUS` audits. """ +import datetime +from oslo_config import cfg from oslo_log import log from watcher._i18n import _ from watcher.common import exception as wexc from watcher.datasource import ceilometer as ceil +from watcher.datasource import gnocchi as gnoc from watcher.decision_engine.model import element from watcher.decision_engine.strategy.strategies import base @@ -104,6 +107,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): # reaches threshold self._meter = self.METER_NAME self._ceilometer = None + self._gnocchi = None @property def ceilometer(self): @@ -115,6 +119,16 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): def ceilometer(self, c): self._ceilometer = c + @property + def gnocchi(self): + if self._gnocchi is None: + self._gnocchi = gnoc.GnocchiHelper(osc=self.osc) + return self._gnocchi + + @gnocchi.setter + def gnocchi(self, gnocchi): + self._gnocchi = gnocchi + @classmethod def get_name(cls): return "workload_balance" @@ -127,6 +141,10 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): def get_translatable_display_name(cls): return "Workload Balance Migration Strategy" + @property + def granularity(self): + return self.input_parameters.get('granularity', 300) + @classmethod def get_schema(cls): # Mandatory default setting for each element @@ -142,9 +160,25 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): "type": "number", "default": 300 }, + "granularity": { + "description": "The time between two measures in an " + "aggregated timeseries of a metric.", + "type": "number", + "default": 300 + }, }, } + @classmethod + def get_config_opts(cls): + return [ + cfg.StrOpt( + "datasource", + help="Data source to use in order to query the needed metrics", + default="ceilometer", + choices=["ceilometer", "gnocchi"]) + ] + def calculate_used_resource(self, node): """Calculate the used vcpus, memory and disk based on VM flavors""" instances = self.compute_model.get_node_instances(node) @@ -251,15 +285,30 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): instances = self.compute_model.get_node_instances(node) node_workload = 0.0 for instance in instances: + cpu_util = None try: - cpu_util = self.ceilometer.statistic_aggregation( - resource_id=instance.uuid, - meter_name=self._meter, - period=self._period, - aggregate='avg') + if self.config.datasource == "ceilometer": + cpu_util = self.ceilometer.statistic_aggregation( + resource_id=instance.uuid, + meter_name=self._meter, + period=self._period, + aggregate='avg') + elif self.config.datasource == "gnocchi": + stop_time = datetime.datetime.utcnow() + start_time = stop_time - datetime.timedelta( + seconds=int(self._period)) + cpu_util = self.gnocchi.statistic_aggregation( + resource_id=instance.uuid, + metric=self._meter, + granularity=self.granularity, + start_time=start_time, + stop_time=stop_time, + aggregation='mean' + ) except Exception as exc: LOG.exception(exc) - LOG.error("Can not get cpu_util from Ceilometer") + LOG.error("Can not get cpu_util from %s", + self.config.datasource) continue if cpu_util is None: LOG.debug("Instance (%s): cpu_util is None", instance.uuid) diff --git a/watcher/tests/decision_engine/model/gnocchi_metrics.py b/watcher/tests/decision_engine/model/gnocchi_metrics.py index 8b5e02a9a..982bcac6c 100644 --- a/watcher/tests/decision_engine/model/gnocchi_metrics.py +++ b/watcher/tests/decision_engine/model/gnocchi_metrics.py @@ -45,6 +45,13 @@ class FakeGnocchiMetrics(object): result = self.get_average_power(resource_id) return result + def mock_get_statistics_wb(self, resource_id, metric, granularity, + start_time, stop_time, aggregation='mean'): + result = 0.0 + if metric == "cpu_util": + result = self.get_average_usage_instance_cpu_wb(resource_id) + return result + @staticmethod def get_average_outlet_temperature(uuid): """The average outlet temperature for host""" @@ -106,8 +113,8 @@ class FakeGnocchiMetrics(object): def get_usage_node_cpu(uuid): """The last VM CPU usage values to average - :param uuid:00 - :return: + :param uuid: instance UUID + :return: float value """ # Normalize mock = {} @@ -142,8 +149,8 @@ class FakeGnocchiMetrics(object): def get_average_usage_instance_cpu(uuid): """The last VM CPU usage values to average - :param uuid:00 - :return: + :param uuid: instance UUID + :return: int value """ # Normalize @@ -214,3 +221,24 @@ class FakeGnocchiMetrics(object): mock[uuid] = 4 return mock[str(uuid)] + + @staticmethod + def get_average_usage_instance_cpu_wb(uuid): + """The last VM CPU usage values to average + + :param uuid: instance UUID + :return: float value + """ + # query influxdb stream + + # compute in stream + + # Normalize + mock = {} + # node 0 + mock['INSTANCE_1'] = 80 + mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 50 + # node 1 + mock['INSTANCE_3'] = 20 + mock['INSTANCE_4'] = 10 + return float(mock[str(uuid)]) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py index 7c8d2aab9..36e06e641 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -17,6 +17,7 @@ # limitations under the License. # import collections +import datetime import mock from watcher.applier.loading import default @@ -27,14 +28,24 @@ from watcher.decision_engine.strategy import strategies from watcher.tests import base from watcher.tests.decision_engine.model import ceilometer_metrics from watcher.tests.decision_engine.model import faker_cluster_state +from watcher.tests.decision_engine.model import gnocchi_metrics class TestWorkloadBalance(base.TestCase): + scenarios = [ + ("Ceilometer", + {"datasource": "ceilometer", + "fake_datasource_cls": ceilometer_metrics.FakeCeilometerMetrics}), + ("Gnocchi", + {"datasource": "gnocchi", + "fake_datasource_cls": gnocchi_metrics.FakeGnocchiMetrics}), + ] + def setUp(self): super(TestWorkloadBalance, self).setUp() # fake metrics - self.fake_metrics = ceilometer_metrics.FakeCeilometerMetrics() + self.fake_metrics = self.fake_datasource_cls() # fake cluster self.fake_cluster = faker_cluster_state.FakerModelCollector() @@ -44,11 +55,11 @@ class TestWorkloadBalance(base.TestCase): self.m_model = p_model.start() self.addCleanup(p_model.stop) - p_ceilometer = mock.patch.object( - strategies.WorkloadBalance, "ceilometer", + p_datasource = mock.patch.object( + strategies.WorkloadBalance, self.datasource, new_callable=mock.PropertyMock) - self.m_ceilometer = p_ceilometer.start() - self.addCleanup(p_ceilometer.stop) + self.m_datasource = p_datasource.start() + self.addCleanup(p_datasource.stop) p_audit_scope = mock.patch.object( strategies.WorkloadBalance, "audit_scope", @@ -58,11 +69,10 @@ class TestWorkloadBalance(base.TestCase): self.addCleanup(p_audit_scope.stop) self.m_audit_scope.return_value = mock.Mock() - - self.m_model.return_value = model_root.ModelRoot() - self.m_ceilometer.return_value = mock.Mock( + self.m_datasource.return_value = mock.Mock( statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) - self.strategy = strategies.WorkloadBalance(config=mock.Mock()) + self.strategy = strategies.WorkloadBalance( + config=mock.Mock(datasource=self.datasource)) self.strategy.input_parameters = utils.Struct() self.strategy.input_parameters.update({'threshold': 25.0, 'period': 300}) @@ -110,7 +120,7 @@ class TestWorkloadBalance(base.TestCase): def test_filter_destination_hosts(self): model = self.fake_cluster.generate_scenario_6_with_2_nodes() self.m_model.return_value = model - self.strategy.ceilometer = mock.MagicMock( + self.strategy.datasource = mock.MagicMock( statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) n1, n2, avg, w_map = self.strategy.group_hosts_by_cpu_util() instance_to_mig = self.strategy.choose_instance_to_migrate( @@ -168,3 +178,40 @@ class TestWorkloadBalance(base.TestCase): loaded_action = loader.load(action['action_type']) loaded_action.input_parameters = action['input_parameters'] loaded_action.validate_parameters() + + def test_periods(self): + model = self.fake_cluster.generate_scenario_1() + self.m_model.return_value = model + p_ceilometer = mock.patch.object( + strategies.WorkloadBalance, "ceilometer") + m_ceilometer = p_ceilometer.start() + self.addCleanup(p_ceilometer.stop) + p_gnocchi = mock.patch.object(strategies.WorkloadBalance, "gnocchi") + m_gnocchi = p_gnocchi.start() + self.addCleanup(p_gnocchi.stop) + datetime_patcher = mock.patch.object( + datetime, 'datetime', + mock.Mock(wraps=datetime.datetime) + ) + mocked_datetime = datetime_patcher.start() + mocked_datetime.utcnow.return_value = datetime.datetime( + 2017, 3, 19, 18, 53, 11, 657417) + self.addCleanup(datetime_patcher.stop) + m_ceilometer.statistic_aggregation = mock.Mock( + side_effect=self.fake_metrics.mock_get_statistics_wb) + m_gnocchi.statistic_aggregation = mock.Mock( + side_effect=self.fake_metrics.mock_get_statistics_wb) + instance0 = model.get_instance_by_uuid("INSTANCE_0") + self.strategy.group_hosts_by_cpu_util() + if self.strategy.config.datasource == "ceilometer": + m_ceilometer.statistic_aggregation.assert_any_call( + aggregate='avg', meter_name='cpu_util', + period=300, resource_id=instance0.uuid) + elif self.strategy.config.datasource == "gnocchi": + stop_time = datetime.datetime.utcnow() + start_time = stop_time - datetime.timedelta( + seconds=int('300')) + m_gnocchi.statistic_aggregation.assert_called_with( + resource_id=mock.ANY, metric='cpu_util', + granularity=300, start_time=start_time, stop_time=stop_time, + aggregation='mean')