diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py index 4f1befd43..286ff0922 100644 --- a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -52,13 +52,16 @@ correctly on all compute nodes within the cluster. This strategy assumes it is possible to live migrate any VM from an active compute node to any other active compute node. """ +import datetime +from oslo_config import cfg from oslo_log import log import six from watcher._i18n import _ from watcher.common import exception from watcher.datasource import ceilometer as ceil +from watcher.datasource import gnocchi as gnoc from watcher.decision_engine.model import element from watcher.decision_engine.strategy.strategies import base @@ -68,12 +71,33 @@ LOG = log.getLogger(__name__) class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): """VM Workload Consolidation Strategy""" + HOST_CPU_USAGE_METRIC_NAME = 'compute.node.cpu.percent' + INSTANCE_CPU_USAGE_METRIC_NAME = 'cpu_util' + + METRIC_NAMES = dict( + ceilometer=dict( + cpu_util_metric='cpu_util', + ram_util_metric='memory.usage', + ram_alloc_metric='memory', + disk_alloc_metric='disk.root.size'), + gnocchi=dict( + cpu_util_metric='cpu_util', + ram_util_metric='memory.usage', + ram_alloc_metric='memory', + disk_alloc_metric='disk.root.size'), + ) + + MIGRATION = "migrate" + CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state" + def __init__(self, config, osc=None): super(VMWorkloadConsolidation, self).__init__(config, osc) self._ceilometer = None + self._gnocchi = None self.number_of_migrations = 0 self.number_of_released_nodes = 0 - self.ceilometer_instance_data_cache = dict() + # self.ceilometer_instance_data_cache = dict() + self.datasource_instance_data_cache = dict() @classmethod def get_name(cls): @@ -101,6 +125,20 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): def ceilometer(self, ceilometer): self._ceilometer = ceilometer + @property + def gnocchi(self): + if self._gnocchi is None: + self._gnocchi = gnoc.GnocchiHelper(osc=self.osc) + return self._gnocchi + + @gnocchi.setter + def gnocchi(self, gnocchi): + self._gnocchi = gnocchi + + @property + def granularity(self): + return self.input_parameters.get('granularity', 300) + @classmethod def get_schema(cls): # Mandatory default setting for each element @@ -111,10 +149,26 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): "getting statistic aggregation", "type": "number", "default": 3600 - } + }, + "granularity": { + "description": "The time between two measures in an " + "aggregated timeseries of a metric.", + "type": "number", + "default": 300 + }, } } + @classmethod + def get_config_opts(cls): + return [ + cfg.StrOpt( + "datasource", + help="Data source to use in order to query the needed metrics", + default="ceilometer", + choices=["ceilometer", "gnocchi"]) + ] + def get_state_str(self, state): """Get resource state in string format. @@ -139,7 +193,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): """ params = {'state': element.ServiceState.ENABLED.value} self.solution.add_action( - action_type='change_nova_service_state', + action_type=self.CHANGE_NOVA_SERVICE_STATE, resource_id=node.uuid, input_parameters=params) self.number_of_released_nodes -= 1 @@ -152,7 +206,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): """ params = {'state': element.ServiceState.DISABLED.value} self.solution.add_action( - action_type='change_nova_service_state', + action_type=self.CHANGE_NOVA_SERVICE_STATE, resource_id=node.uuid, input_parameters=params) self.number_of_released_nodes += 1 @@ -189,7 +243,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): params = {'migration_type': migration_type, 'source_node': source_node.uuid, 'destination_node': destination_node.uuid} - self.solution.add_action(action_type='migrate', + self.solution.add_action(action_type=self.MIGRATION, resource_id=instance.uuid, input_parameters=params) self.number_of_migrations += 1 @@ -205,56 +259,98 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): element.ServiceState.DISABLED.value): self.add_action_disable_node(node) - def get_instance_utilization(self, instance, aggr='avg'): + def get_instance_utilization(self, instance): """Collect cpu, ram and disk utilization statistics of a VM. :param instance: instance object :param aggr: string :return: dict(cpu(number of vcpus used), ram(MB used), disk(B used)) """ - if instance.uuid in self.ceilometer_instance_data_cache.keys(): - return self.ceilometer_instance_data_cache.get(instance.uuid) + instance_cpu_util = None + instance_ram_util = None + instance_disk_util = None - cpu_util_metric = 'cpu_util' - ram_util_metric = 'memory.usage' + if instance.uuid in self.datasource_instance_data_cache.keys(): + return self.datasource_instance_data_cache.get(instance.uuid) - ram_alloc_metric = 'memory' - disk_alloc_metric = 'disk.root.size' - instance_cpu_util = self.ceilometer.statistic_aggregation( - resource_id=instance.uuid, meter_name=cpu_util_metric, - period=self.period, aggregate=aggr) + cpu_util_metric = self.METRIC_NAMES[ + self.config.datasource]['cpu_util_metric'] + ram_util_metric = self.METRIC_NAMES[ + self.config.datasource]['ram_util_metric'] + ram_alloc_metric = self.METRIC_NAMES[ + self.config.datasource]['ram_alloc_metric'] + disk_alloc_metric = self.METRIC_NAMES[ + self.config.datasource]['disk_alloc_metric'] + if self.config.datasource == "ceilometer": + instance_cpu_util = self.ceilometer.statistic_aggregation( + resource_id=instance.uuid, meter_name=cpu_util_metric, + period=self.period, aggregate='avg') + instance_ram_util = self.ceilometer.statistic_aggregation( + resource_id=instance.uuid, meter_name=ram_util_metric, + period=self.period, aggregate='avg') + if not instance_ram_util: + instance_ram_util = self.ceilometer.statistic_aggregation( + resource_id=instance.uuid, meter_name=ram_alloc_metric, + period=self.period, aggregate='avg') + instance_disk_util = self.ceilometer.statistic_aggregation( + resource_id=instance.uuid, meter_name=disk_alloc_metric, + period=self.period, aggregate='avg') + elif self.config.datasource == "gnocchi": + stop_time = datetime.datetime.utcnow() + start_time = stop_time - datetime.timedelta( + seconds=int(self.period)) + instance_cpu_util = self.gnocchi.statistic_aggregation( + resource_id=instance.uuid, + metric=cpu_util_metric, + granularity=self.granularity, + start_time=start_time, + stop_time=stop_time, + aggregation='mean' + ) + instance_ram_util = self.gnocchi.statistic_aggregation( + resource_id=instance.uuid, + metric=ram_util_metric, + granularity=self.granularity, + start_time=start_time, + stop_time=stop_time, + aggregation='mean' + ) + if not instance_ram_util: + instance_ram_util = self.gnocchi.statistic_aggregation( + resource_id=instance.uuid, + metric=ram_alloc_metric, + granularity=self.granularity, + start_time=start_time, + stop_time=stop_time, + aggregation='mean' + ) + instance_disk_util = self.gnocchi.statistic_aggregation( + resource_id=instance.uuid, + metric=disk_alloc_metric, + granularity=self.granularity, + start_time=start_time, + stop_time=stop_time, + aggregation='mean' + ) if instance_cpu_util: total_cpu_utilization = ( instance.vcpus * (instance_cpu_util / 100.0)) else: total_cpu_utilization = instance.vcpus - instance_ram_util = self.ceilometer.statistic_aggregation( - resource_id=instance.uuid, meter_name=ram_util_metric, - period=self.period, aggregate=aggr) - - if not instance_ram_util: - instance_ram_util = self.ceilometer.statistic_aggregation( - resource_id=instance.uuid, meter_name=ram_alloc_metric, - period=self.period, aggregate=aggr) - - instance_disk_util = self.ceilometer.statistic_aggregation( - resource_id=instance.uuid, meter_name=disk_alloc_metric, - period=self.period, aggregate=aggr) - if not instance_ram_util or not instance_disk_util: LOG.error( 'No values returned by %s for memory.usage ' 'or disk.root.size', instance.uuid) raise exception.NoDataFound - self.ceilometer_instance_data_cache[instance.uuid] = dict( + self.datasource_instance_data_cache[instance.uuid] = dict( cpu=total_cpu_utilization, ram=instance_ram_util, disk=instance_disk_util) - return self.ceilometer_instance_data_cache.get(instance.uuid) + return self.datasource_instance_data_cache.get(instance.uuid) - def get_node_utilization(self, node, aggr='avg'): + def get_node_utilization(self, node): """Collect cpu, ram and disk utilization statistics of a node. :param node: node object @@ -267,7 +363,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): node_cpu_util = 0 for instance in node_instances: instance_util = self.get_instance_utilization( - instance, aggr) + instance) node_cpu_util += instance_util['cpu'] node_ram_util += instance_util['ram'] node_disk_util += instance_util['disk'] @@ -372,7 +468,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): """ migrate_actions = ( a for a in self.solution.actions if a[ - 'action_type'] == 'migrate') + 'action_type'] == self.MIGRATION) instance_to_be_migrated = ( a['input_parameters']['resource_id'] for a in migrate_actions) instance_uuids = list(set(instance_to_be_migrated)) diff --git a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py index e0664158a..3c40d6657 100644 --- a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py +++ b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py @@ -157,3 +157,86 @@ class FakeCeilometerMetrics(object): instance_disk_util['INSTANCE_8'] = 25 instance_disk_util['INSTANCE_9'] = 25 return instance_disk_util[str(r_id)] + + +class FakeGnocchiMetrics(object): + def __init__(self, model): + self.model = model + + def mock_get_statistics(self, resource_id, metric, granularity, + start_time, stop_time, aggregation='mean'): + if metric == "compute.node.cpu.percent": + return self.get_node_cpu_util(resource_id) + elif metric == "cpu_util": + return self.get_instance_cpu_util(resource_id) + elif metric == "memory.usage": + return self.get_instance_ram_util(resource_id) + elif metric == "disk.root.size": + return self.get_instance_disk_root_size(resource_id) + + def get_node_cpu_util(self, r_id): + """Calculates node utilization dynamicaly. + + node CPU utilization should consider + and corelate with actual instance-node mappings + provided within a cluster model. + Returns relative node CPU utilization <0, 100>. + + :param r_id: resource id + """ + node_uuid = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1]) + node = self.model.get_node_by_uuid(node_uuid) + instances = self.model.get_node_instances(node) + util_sum = 0.0 + for instance_uuid in instances: + instance = self.model.get_instance_by_uuid(instance_uuid) + total_cpu_util = instance.vcpus * self.get_instance_cpu_util( + instance.uuid) + util_sum += total_cpu_util / 100.0 + util_sum /= node.vcpus + return util_sum * 100.0 + + @staticmethod + def get_instance_cpu_util(r_id): + instance_cpu_util = dict() + instance_cpu_util['INSTANCE_0'] = 10 + instance_cpu_util['INSTANCE_1'] = 30 + instance_cpu_util['INSTANCE_2'] = 60 + instance_cpu_util['INSTANCE_3'] = 20 + instance_cpu_util['INSTANCE_4'] = 40 + instance_cpu_util['INSTANCE_5'] = 50 + instance_cpu_util['INSTANCE_6'] = 100 + instance_cpu_util['INSTANCE_7'] = 100 + instance_cpu_util['INSTANCE_8'] = 100 + instance_cpu_util['INSTANCE_9'] = 100 + return instance_cpu_util[str(r_id)] + + @staticmethod + def get_instance_ram_util(r_id): + instance_ram_util = dict() + instance_ram_util['INSTANCE_0'] = 1 + instance_ram_util['INSTANCE_1'] = 2 + instance_ram_util['INSTANCE_2'] = 4 + instance_ram_util['INSTANCE_3'] = 8 + instance_ram_util['INSTANCE_4'] = 3 + instance_ram_util['INSTANCE_5'] = 2 + instance_ram_util['INSTANCE_6'] = 1 + instance_ram_util['INSTANCE_7'] = 2 + instance_ram_util['INSTANCE_8'] = 4 + instance_ram_util['INSTANCE_9'] = 8 + return instance_ram_util[str(r_id)] + + @staticmethod + def get_instance_disk_root_size(r_id): + instance_disk_util = dict() + instance_disk_util['INSTANCE_0'] = 10 + instance_disk_util['INSTANCE_1'] = 15 + instance_disk_util['INSTANCE_2'] = 30 + instance_disk_util['INSTANCE_3'] = 35 + instance_disk_util['INSTANCE_4'] = 20 + instance_disk_util['INSTANCE_5'] = 25 + instance_disk_util['INSTANCE_6'] = 25 + instance_disk_util['INSTANCE_7'] = 25 + instance_disk_util['INSTANCE_8'] = 25 + instance_disk_util['INSTANCE_9'] = 25 + return instance_disk_util[str(r_id)] diff --git a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py index 872ed8f2c..0f8382410 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py @@ -18,6 +18,7 @@ # limitations under the License. # +import datetime import mock from watcher.common import exception @@ -29,6 +30,17 @@ from watcher.tests.decision_engine.model import faker_cluster_and_metrics class TestVMWorkloadConsolidation(base.TestCase): + scenarios = [ + ("Ceilometer", + {"datasource": "ceilometer", + "fake_datasource_cls": + faker_cluster_and_metrics.FakeCeilometerMetrics}), + ("Gnocchi", + {"datasource": "gnocchi", + "fake_datasource_cls": + faker_cluster_and_metrics.FakeGnocchiMetrics}), + ] + def setUp(self): super(TestVMWorkloadConsolidation, self).setUp() @@ -41,11 +53,11 @@ class TestVMWorkloadConsolidation(base.TestCase): self.m_model = p_model.start() self.addCleanup(p_model.stop) - p_ceilometer = mock.patch.object( - strategies.VMWorkloadConsolidation, "ceilometer", + p_datasource = mock.patch.object( + strategies.VMWorkloadConsolidation, self.datasource, new_callable=mock.PropertyMock) - self.m_ceilometer = p_ceilometer.start() - self.addCleanup(p_ceilometer.stop) + self.m_datasource = p_datasource.start() + self.addCleanup(p_datasource.stop) p_audit_scope = mock.patch.object( strategies.VMWorkloadConsolidation, "audit_scope", @@ -57,13 +69,14 @@ class TestVMWorkloadConsolidation(base.TestCase): self.m_audit_scope.return_value = mock.Mock() # fake metrics - self.fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics( + self.fake_metrics = self.fake_datasource_cls( self.m_model.return_value) self.m_model.return_value = model_root.ModelRoot() - self.m_ceilometer.return_value = mock.Mock( + self.m_datasource.return_value = mock.Mock( statistic_aggregation=self.fake_metrics.mock_get_statistics) - self.strategy = strategies.VMWorkloadConsolidation(config=mock.Mock()) + self.strategy = strategies.VMWorkloadConsolidation( + config=mock.Mock(datasource=self.datasource)) def test_exception_stale_cdm(self): self.fake_cluster.set_cluster_data_model_as_stale() @@ -81,7 +94,7 @@ class TestVMWorkloadConsolidation(base.TestCase): instance_util = dict(cpu=1.0, ram=1, disk=10) self.assertEqual( instance_util, - self.strategy.get_instance_utilization(instance_0, model)) + self.strategy.get_instance_utilization(instance_0)) def test_get_node_utilization(self): model = self.fake_cluster.generate_scenario_1() @@ -91,7 +104,7 @@ class TestVMWorkloadConsolidation(base.TestCase): node_util = dict(cpu=1.0, ram=1, disk=10) self.assertEqual( node_util, - self.strategy.get_node_utilization(node_0, model)) + self.strategy.get_node_utilization(node_0)) def test_get_node_capacity(self): model = self.fake_cluster.generate_scenario_1() @@ -301,10 +314,33 @@ class TestVMWorkloadConsolidation(base.TestCase): strategies.VMWorkloadConsolidation, "ceilometer") m_ceilometer = p_ceilometer.start() self.addCleanup(p_ceilometer.stop) + p_gnocchi = mock.patch.object( + strategies.VMWorkloadConsolidation, "gnocchi") + m_gnocchi = p_gnocchi.start() + self.addCleanup(p_gnocchi.stop) + datetime_patcher = mock.patch.object( + datetime, 'datetime', + mock.Mock(wraps=datetime.datetime) + ) + mocked_datetime = datetime_patcher.start() + mocked_datetime.utcnow.return_value = datetime.datetime( + 2017, 3, 19, 18, 53, 11, 657417) + self.addCleanup(datetime_patcher.stop) m_ceilometer.return_value = mock.Mock( statistic_aggregation=self.fake_metrics.mock_get_statistics) + m_gnocchi.return_value = mock.Mock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) instance0 = model.get_instance_by_uuid("INSTANCE_0") self.strategy.get_instance_utilization(instance0) - m_ceilometer.statistic_aggregation.assert_any_call( - aggregate='avg', meter_name='disk.root.size', - period=3600, resource_id=instance0.uuid) + if self.strategy.config.datasource == "ceilometer": + m_ceilometer.statistic_aggregation.assert_any_call( + aggregate='avg', meter_name='disk.root.size', + period=3600, resource_id=instance0.uuid) + elif self.strategy.config.datasource == "gnocchi": + stop_time = datetime.datetime.utcnow() + start_time = stop_time - datetime.timedelta( + seconds=int('3600')) + m_gnocchi.statistic_aggregation.assert_called_with( + resource_id=instance0.uuid, metric='disk.root.size', + granularity=300, start_time=start_time, stop_time=stop_time, + aggregation='mean')