From 84cb589aa9294b95e026c9fa4a30daac704d0c8c Mon Sep 17 00:00:00 2001 From: Dantali0n Date: Wed, 1 May 2019 10:09:58 +0200 Subject: [PATCH] formal datasource interface implementation Changes to the baseclass for datasources so strategies can be made compatible with every datasource. Baseclass methods clearly describe expected values and types for both parameters and for method returns. query_retry has been added as base method since every current datasource implements it. Ceilometer is updated to work with the new baseclass. Several methods which are not part of the baseclass and are not used by any strategies are removed. The signature of these methods would have to be changed to fit with the new base class while it would limit strategies to only work with Ceilometer. Gnocchi is updated to work with the new baseclass. Gnocchi and Ceilometer will perform a transformation for the host_airflow metric as it retrieves 1/10 th of the actual CFM Monasca is updated to work with the new baseclass. FakeMetrics for Gnocchi, Monasca and Ceilometer are updated to work with the new method signatures of the baseclass. FakeClusterAndMetrics for Ceilometer and Gnocchi are updated to work with the new method signatures of the baseclass. The strategies workload_balance, vm_workload_consolidation, workload_stabilization, basic_consolidation, noisy_neighbour, outlet_temp_control and uniform_airflow are updated to work with the new datasource baseclass. This patch will break compatibility with plugin strategies and datasources due to the changes in signatures. Depends-on: I7aa52a9b82f4aa849f2378d4d1c03453e45c0c78 Change-Id: Ie30ca3dbf01062cbb20d3be5d514ec6b5155cd7c Implements: blueprint formal-datasource-interface --- watcher/datasources/base.py | 141 +++++++++-- watcher/datasources/ceilometer.py | 223 ++++++++--------- watcher/datasources/gnocchi.py | 174 +++++++------- watcher/datasources/monasca.py | 153 +++++------- .../decision_engine/model/element/instance.py | 1 - .../strategies/basic_consolidation.py | 40 +--- .../strategy/strategies/noisy_neighbor.py | 9 +- .../strategies/outlet_temp_control.py | 26 +- .../strategy/strategies/uniform_airflow.py | 52 ++-- .../strategies/vm_workload_consolidation.py | 45 +--- .../strategy/strategies/workload_balance.py | 33 ++- .../strategies/workload_stabilization.py | 69 +++--- .../datasources/test_ceilometer_helper.py | 127 ++++------ .../tests/datasources/test_gnocchi_helper.py | 105 ++++---- .../tests/datasources/test_monasca_helper.py | 56 +---- .../model/ceilometer_metrics.py | 205 ++++++++-------- .../model/faker_cluster_and_metrics.py | 100 ++++---- .../decision_engine/model/gnocchi_metrics.py | 224 +++++++++--------- .../decision_engine/model/monasca_metrics.py | 202 ++-------------- .../strategies/test_basic_consolidation.py | 71 +----- .../strategies/test_outlet_temp_control.py | 6 +- .../strategies/test_workload_balance.py | 16 +- .../strategies/test_workload_stabilization.py | 90 ++++--- 23 files changed, 915 insertions(+), 1253 deletions(-) diff --git a/watcher/datasources/base.py b/watcher/datasources/base.py index 08c75851b..0af04baf0 100644 --- a/watcher/datasources/base.py +++ b/watcher/datasources/base.py @@ -17,9 +17,22 @@ import abc class DataSourceBase(object): + """Base Class for datasources in Watcher + This base class defines the abstract methods that datasources should + implement and contains details on the values expected for parameters as + well as what the values for return types should be. + """ + + """Possible options for the parameters named aggregate""" + AGGREGATES = ['mean', 'min', 'max', 'count'] + + """Possible options for the parameters named resource_type""" + RESOURCE_TYPES = ['compute_node', 'instance', 'bare_metal', 'storage'] + + """Possible metrics a datasource can support and their internal name""" METRIC_MAP = dict(host_cpu_usage=None, - host_memory_usage=None, + host_ram_usage=None, host_outlet_temp=None, host_inlet_temp=None, host_airflow=None, @@ -32,69 +45,155 @@ class DataSourceBase(object): ) @abc.abstractmethod - def statistic_aggregation(self, resource_id=None, meter_name=None, - period=300, granularity=300, dimensions=None, - aggregation='avg', group_by='*'): + def query_retry(self, f, *args, **kargs): + """Attempts to retrieve metrics from the external service + + Attempts to access data from the external service and handles + exceptions upon exception the retrieval should be retried in accordance + to the value of query_max_retries + :param f: The method that performs the actual querying for metrics + :param args: Array of arguments supplied to the method + :param kargs: The amount of arguments supplied to the method + :return: The value as retrieved from the external service + """ pass @abc.abstractmethod def list_metrics(self): + """Returns the supported metrics that the datasource can retrieve + + :return: List of supported metrics containing keys from METRIC_MAP + """ pass @abc.abstractmethod def check_availability(self): + """Tries to contact the datasource to see if it is available + + :return: True or False with true meaning the datasource is available + """ pass @abc.abstractmethod - def get_host_cpu_usage(self, resource_id, period, aggregate, + def statistic_aggregation(self, resource=None, resource_type=None, + meter_name=None, period=300, aggregate='mean', + granularity=300): + """Retrieves and converts metrics based on the specified parameters + + :param resource: Resource object as defined in watcher models such as + ComputeNode and Instance + :param resource_type: Indicates which type of object is supplied + to the resource parameter + :param meter_name: The desired metric to retrieve as key from + METRIC_MAP + :param period: Time span to collect metrics from in seconds + :param granularity: Interval between samples in measurements in + seconds + :param aggregate: Aggregation method to extract value from set of + samples + :return: The gathered value for the metric the type of value depends on + the meter_name + """ + + pass + + @abc.abstractmethod + def get_host_cpu_usage(self, resource, period, aggregate, granularity=None): + """Get the cpu usage for a host such as a compute_node + + :return: cpu usage as float ranging between 0 and 100 representing the + total cpu usage as percentage + """ pass @abc.abstractmethod - def get_host_memory_usage(self, resource_id, period, aggregate, - granularity=None): + def get_host_ram_usage(self, resource, period, aggregate, + granularity=None): + """Get the ram usage for a host such as a compute_node + + :return: ram usage as float in megabytes + """ pass @abc.abstractmethod - def get_host_outlet_temp(self, resource_id, period, aggregate, + def get_host_outlet_temp(self, resource, period, aggregate, granularity=None): + """Get the outlet temperature for a host such as compute_node + + :return: outlet temperature as float in degrees celsius + """ pass @abc.abstractmethod - def get_host_inlet_temp(self, resource_id, period, aggregate, + def get_host_inlet_temp(self, resource, period, aggregate, granularity=None): + """Get the inlet temperature for a host such as compute_node + + :return: inlet temperature as float in degrees celsius + """ pass @abc.abstractmethod - def get_host_airflow(self, resource_id, period, aggregate, + def get_host_airflow(self, resource, period, aggregate, granularity=None): + """Get the airflow for a host such as compute_node + + :return: airflow as float in cfm + """ pass @abc.abstractmethod - def get_host_power(self, resource_id, period, aggregate, granularity=None): + def get_host_power(self, resource, period, aggregate, + granularity=None): + """Get the power for a host such as compute_node + + :return: power as float in watts + """ pass @abc.abstractmethod - def get_instance_cpu_usage(self, resource_id, period, aggregate, - granularity=None): + def get_instance_cpu_usage(self, resource, period, + aggregate, granularity=None): + """Get the cpu usage for an instance + + :return: cpu usage as float ranging between 0 and 100 representing the + total cpu usage as percentage + """ pass @abc.abstractmethod - def get_instance_ram_usage(self, resource_id, period, aggregate, - granularity=None): + def get_instance_ram_usage(self, resource, period, + aggregate, granularity=None): + """Get the ram usage for an instance + + :return: ram usage as float in megabytes + """ pass @abc.abstractmethod - def get_instance_ram_allocated(self, resource_id, period, aggregate, - granularity=None): + def get_instance_ram_allocated(self, resource, period, + aggregate, granularity=None): + """Get the ram allocated for an instance + + :return: total ram allocated as float in megabytes + """ pass @abc.abstractmethod - def get_instance_l3_cache_usage(self, resource_id, period, aggregate, - granularity=None): + def get_instance_l3_cache_usage(self, resource, period, + aggregate, granularity=None): + """Get the l3 cache usage for an instance + + :return: l3 cache usage as integer in bytes + """ pass @abc.abstractmethod - def get_instance_root_disk_size(self, resource_id, period, aggregate, - granularity=None): + def get_instance_root_disk_size(self, resource, period, + aggregate, granularity=None): + """Get the size of the root disk for an instance + + :return: root disk size as float in gigabytes + """ pass diff --git a/watcher/datasources/ceilometer.py b/watcher/datasources/ceilometer.py index 215ad01de..b5497aecc 100644 --- a/watcher/datasources/ceilometer.py +++ b/watcher/datasources/ceilometer.py @@ -41,16 +41,16 @@ class CeilometerHelper(base.DataSourceBase): NAME = 'ceilometer' METRIC_MAP = dict(host_cpu_usage='compute.node.cpu.percent', - instance_cpu_usage='cpu_util', - instance_l3_cache_usage='cpu_l3_cache', + host_ram_usage='hardware.memory.used', host_outlet_temp='hardware.ipmi.node.outlet_temperature', - host_airflow='hardware.ipmi.node.airflow', host_inlet_temp='hardware.ipmi.node.temperature', + host_airflow='hardware.ipmi.node.airflow', host_power='hardware.ipmi.node.power', + instance_cpu_usage='cpu_util', instance_ram_usage='memory.resident', instance_ram_allocated='memory', + instance_l3_cache_usage='cpu_l3_cache', instance_root_disk_size='disk.root.size', - host_memory_usage='hardware.memory.used', ) def __init__(self, osc=None): @@ -139,6 +139,15 @@ class CeilometerHelper(base.DataSourceBase): except Exception: raise + def list_metrics(self): + """List the user's meters.""" + try: + meters = self.query_retry(f=self.ceilometer.meters.list) + except Exception: + return set() + else: + return meters + def check_availability(self): try: self.query_retry(self.ceilometer.resources.list) @@ -152,144 +161,118 @@ class CeilometerHelper(base.DataSourceBase): limit=limit, q=query) - def statistic_list(self, meter_name, query=None, period=None): - """List of statistics.""" - statistics = self.ceilometer.statistics.list( - meter_name=meter_name, - q=query, - period=period) - return statistics - - def list_metrics(self): - """List the user's meters.""" - try: - meters = self.query_retry(f=self.ceilometer.meters.list) - except Exception: - return set() - else: - return meters - - def statistic_aggregation(self, resource_id=None, meter_name=None, - period=300, granularity=300, dimensions=None, - aggregation='avg', group_by='*'): - """Representing a statistic aggregate by operators - - :param resource_id: id of resource to list statistics for. - :param meter_name: Name of meter to list statistics for. - :param period: Period in seconds over which to group samples. - :param granularity: frequency of marking metric point, in seconds. - This param isn't used in Ceilometer datasource. - :param dimensions: dimensions (dict). This param isn't used in - Ceilometer datasource. - :param aggregation: Available aggregates are: count, cardinality, - min, max, sum, stddev, avg. Defaults to avg. - :param group_by: list of columns to group the metrics to be returned. - This param isn't used in Ceilometer datasource. - :return: Return the latest statistical data, None if no data. - """ - + def statistic_aggregation(self, resource=None, resource_type=None, + meter_name=None, period=300, granularity=300, + aggregate='mean'): end_time = datetime.datetime.utcnow() - if aggregation == 'mean': - aggregation = 'avg' start_time = end_time - datetime.timedelta(seconds=int(period)) + + meter = self.METRIC_MAP.get(meter_name) + if meter is None: + raise exception.NoSuchMetric() + + if aggregate == 'mean': + aggregate = 'avg' + elif aggregate == 'count': + aggregate = 'avg' + LOG.warning('aggregate type count not supported by ceilometer,' + ' replaced with mean.') + + resource_id = resource.uuid + if resource_type == 'compute_node': + resource_id = "%s_%s" % (resource.uuid, resource.hostname) + query = self.build_query( resource_id=resource_id, start_time=start_time, end_time=end_time) statistic = self.query_retry(f=self.ceilometer.statistics.list, - meter_name=meter_name, + meter_name=meter, q=query, period=period, aggregates=[ - {'func': aggregation}]) + {'func': aggregate}]) item_value = None if statistic: - item_value = statistic[-1]._info.get('aggregate').get(aggregation) + item_value = statistic[-1]._info.get('aggregate').get(aggregate) + if meter_name is 'host_airflow': + # Airflow from hardware.ipmi.node.airflow is reported as + # 1/10 th of actual CFM + item_value *= 10 return item_value - def get_last_sample_values(self, resource_id, meter_name, limit=1): - samples = self.query_sample( - meter_name=meter_name, - query=self.build_query(resource_id=resource_id), - limit=limit) - values = [] - for index, sample in enumerate(samples): - values.append( - {'sample_%s' % index: { - 'timestamp': sample._info['timestamp'], - 'value': sample._info['counter_volume']}}) - return values + def get_host_cpu_usage(self, resource, period, + aggregate, granularity=None): - def get_last_sample_value(self, resource_id, meter_name): - samples = self.query_sample( - meter_name=meter_name, - query=self.build_query(resource_id=resource_id)) - if samples: - return samples[-1]._info['counter_volume'] - else: - return False + return self.statistic_aggregation( + resource, 'compute_node', 'host_cpu_usage', period, + aggregate, granularity) - def get_host_cpu_usage(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('host_cpu_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + def get_host_ram_usage(self, resource, period, + aggregate, granularity=None): - def get_instance_cpu_usage(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('instance_cpu_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + return self.statistic_aggregation( + resource, 'compute_node', 'host_ram_usage', period, + aggregate, granularity) - def get_host_memory_usage(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('host_memory_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + def get_host_outlet_temp(self, resource, period, + aggregate, granularity=None): - def get_instance_ram_usage(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('instance_ram_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + return self.statistic_aggregation( + resource, 'compute_node', 'host_outlet_temp', period, + aggregate, granularity) - def get_instance_l3_cache_usage(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('instance_l3_cache_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + def get_host_inlet_temp(self, resource, period, + aggregate, granularity=None): - def get_instance_ram_allocated(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('instance_ram_allocated') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + return self.statistic_aggregation( + resource, 'compute_node', 'host_inlet_temp', period, + aggregate, granularity) - def get_instance_root_disk_size(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('instance_root_disk_size') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + def get_host_airflow(self, resource, period, + aggregate, granularity=None): - def get_host_outlet_temp(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('host_outlet_temp') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + return self.statistic_aggregation( + resource, 'compute_node', 'host_airflow', period, + aggregate, granularity) - def get_host_inlet_temp(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('host_inlet_temp') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + def get_host_power(self, resource, period, + aggregate, granularity=None): - def get_host_airflow(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('host_airflow') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + return self.statistic_aggregation( + resource, 'compute_node', 'host_power', period, + aggregate, granularity) - def get_host_power(self, resource_id, period, aggregate, - granularity=None): - meter_name = self.METRIC_MAP.get('host_power') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + def get_instance_cpu_usage(self, resource, period, + aggregate, granularity=None): + + return self.statistic_aggregation( + resource, 'instance', 'instance_cpu_usage', period, + aggregate, granularity) + + def get_instance_ram_usage(self, resource, period, + aggregate, granularity=None): + + return self.statistic_aggregation( + resource, 'instance', 'instance_ram_usage', period, + aggregate, granularity) + + def get_instance_ram_allocated(self, resource, period, + aggregate, granularity=None): + + return self.statistic_aggregation( + resource, 'instance', 'instance_ram_allocated', period, + aggregate, granularity) + + def get_instance_l3_cache_usage(self, resource, period, + aggregate, granularity=None): + + return self.statistic_aggregation( + resource, 'instance', 'instance_l3_cache_usage', period, + aggregate, granularity) + + def get_instance_root_disk_size(self, resource, period, + aggregate, granularity=None): + + return self.statistic_aggregation( + resource, 'instance', 'instance_root_disk_size', period, + aggregate, granularity) diff --git a/watcher/datasources/gnocchi.py b/watcher/datasources/gnocchi.py index a1ec97161..c4ee110bf 100644 --- a/watcher/datasources/gnocchi.py +++ b/watcher/datasources/gnocchi.py @@ -25,7 +25,6 @@ from oslo_log import log from watcher.common import clients from watcher.common import exception -from watcher.common import utils as common_utils from watcher.datasources import base CONF = cfg.CONF @@ -36,16 +35,16 @@ class GnocchiHelper(base.DataSourceBase): NAME = 'gnocchi' METRIC_MAP = dict(host_cpu_usage='compute.node.cpu.percent', - instance_cpu_usage='cpu_util', - instance_l3_cache_usage='cpu_l3_cache', + host_ram_usage='hardware.memory.used', host_outlet_temp='hardware.ipmi.node.outlet_temperature', - host_airflow='hardware.ipmi.node.airflow', host_inlet_temp='hardware.ipmi.node.temperature', + host_airflow='hardware.ipmi.node.airflow', host_power='hardware.ipmi.node.power', + instance_cpu_usage='cpu_util', instance_ram_usage='memory.resident', instance_ram_allocated='memory', + instance_l3_cache_usage='cpu_l3_cache', instance_root_disk_size='disk.root.size', - host_memory_usage='hardware.memory.used', ) def __init__(self, osc=None): @@ -54,6 +53,7 @@ class GnocchiHelper(base.DataSourceBase): self.gnocchi = self.osc.gnocchi() def query_retry(self, f, *args, **kwargs): + # TODO(Dantali0n) move gnocchi query_max_retries into general config for i in range(CONF.gnocchi_client.query_max_retries): try: return f(*args, **kwargs) @@ -78,28 +78,24 @@ class GnocchiHelper(base.DataSourceBase): else: return set([metric['name'] for metric in response]) - def statistic_aggregation(self, resource_id=None, meter_name=None, - period=300, granularity=300, dimensions=None, - aggregation='avg', group_by='*'): - """Representing a statistic aggregate by operators - - :param resource_id: id of resource to list statistics for. - :param meter_name: meter name of which we want the statistics. - :param period: Period in seconds over which to group samples. - :param granularity: frequency of marking metric point, in seconds. - :param dimensions: dimensions (dict). This param isn't used in - Gnocchi datasource. - :param aggregation: Should be chosen in accordance with policy - aggregations. - :param group_by: list of columns to group the metrics to be returned. - This param isn't used in Gnocchi datasource. - :return: value of aggregated metric - """ - + def statistic_aggregation(self, resource=None, resource_type=None, + meter_name=None, period=300, aggregate='mean', + granularity=300): stop_time = datetime.utcnow() start_time = stop_time - timedelta(seconds=(int(period))) - if not common_utils.is_uuid_like(resource_id): + meter = self.METRIC_MAP.get(meter_name) + if meter is None: + raise exception.NoSuchMetric() + + if aggregate == 'count': + aggregate = 'mean' + LOG.warning('aggregate type count not supported by gnocchi,' + ' replaced with mean.') + + resource_id = resource.uuid + if resource_type == 'compute_node': + resource_id = "%s_%s" % (resource.uuid, resource.hostname) kwargs = dict(query={"=": {"original_resource_id": resource_id}}, limit=1) resources = self.query_retry( @@ -112,12 +108,12 @@ class GnocchiHelper(base.DataSourceBase): resource_id = resources[0]['id'] raw_kwargs = dict( - metric=meter_name, + metric=meter, start=start_time, stop=stop_time, resource_id=resource_id, granularity=granularity, - aggregation=aggregation, + aggregation=aggregate, ) kwargs = {k: v for k, v in raw_kwargs.items() if k and v} @@ -128,70 +124,88 @@ class GnocchiHelper(base.DataSourceBase): if statistics: # return value of latest measure # measure has structure [time, granularity, value] - return statistics[-1][2] + return_value = statistics[-1][2] - def get_host_cpu_usage(self, resource_id, period, aggregate, + if meter_name is 'host_airflow': + # Airflow from hardware.ipmi.node.airflow is reported as + # 1/10 th of actual CFM + return_value *= 10 + + return return_value + + def get_host_cpu_usage(self, resource, period, aggregate, granularity=300): - meter_name = self.METRIC_MAP.get('host_cpu_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) - def get_instance_cpu_usage(self, resource_id, period, aggregate, - granularity=300): - meter_name = self.METRIC_MAP.get('instance_cpu_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + return self.statistic_aggregation( + resource, 'compute_node', 'host_cpu_usage', period, + aggregate, granularity) - def get_host_memory_usage(self, resource_id, period, aggregate, - granularity=300): - meter_name = self.METRIC_MAP.get('host_memory_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + def get_host_ram_usage(self, resource, period, aggregate, + granularity=300): - def get_instance_ram_usage(self, resource_id, period, aggregate, - granularity=300): - meter_name = self.METRIC_MAP.get('instance_ram_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + return self.statistic_aggregation( + resource, 'compute_node', 'host_ram_usage', period, + aggregate, granularity) - def get_instance_l3_cache_usage(self, resource_id, period, aggregate, - granularity=300): - meter_name = self.METRIC_MAP.get('instance_l3_cache_usage') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) - - def get_instance_ram_allocated(self, resource_id, period, aggregate, - granularity=300): - meter_name = self.METRIC_MAP.get('instance_ram_allocated') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) - - def get_instance_root_disk_size(self, resource_id, period, aggregate, - granularity=300): - meter_name = self.METRIC_MAP.get('instance_root_disk_size') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) - - def get_host_outlet_temp(self, resource_id, period, aggregate, + def get_host_outlet_temp(self, resource, period, aggregate, granularity=300): - meter_name = self.METRIC_MAP.get('host_outlet_temp') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) - def get_host_inlet_temp(self, resource_id, period, aggregate, + return self.statistic_aggregation( + resource, 'compute_node', 'host_outlet_temp', period, + aggregate, granularity) + + def get_host_inlet_temp(self, resource, period, aggregate, granularity=300): - meter_name = self.METRIC_MAP.get('host_inlet_temp') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) - def get_host_airflow(self, resource_id, period, aggregate, + return self.statistic_aggregation( + resource, 'compute_node', 'host_inlet_temp', period, + aggregate, granularity) + + def get_host_airflow(self, resource, period, aggregate, granularity=300): - meter_name = self.METRIC_MAP.get('host_airflow') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) - def get_host_power(self, resource_id, period, aggregate, + return self.statistic_aggregation( + resource, 'compute_node', 'host_airflow', period, + aggregate, granularity) + + def get_host_power(self, resource, period, aggregate, granularity=300): - meter_name = self.METRIC_MAP.get('host_power') - return self.statistic_aggregation(resource_id, meter_name, period, - granularity, aggregation=aggregate) + + return self.statistic_aggregation( + resource, 'compute_node', 'host_power', period, + aggregate, granularity) + + def get_instance_cpu_usage(self, resource, period, aggregate, + granularity=300): + + return self.statistic_aggregation( + resource, 'instance', 'instance_cpu_usage', period, + aggregate, granularity) + + def get_instance_ram_usage(self, resource, period, aggregate, + granularity=300): + + return self.statistic_aggregation( + resource, 'instance', 'instance_ram_usage', period, + aggregate, granularity) + + def get_instance_ram_allocated(self, resource, period, aggregate, + granularity=300): + + return self.statistic_aggregation( + resource, 'instance', 'instance_ram_allocated', period, + aggregate, granularity) + + def get_instance_l3_cache_usage(self, resource, period, aggregate, + granularity=300): + + return self.statistic_aggregation( + resource, 'instance', 'instance_l3_cache_usage', period, + aggregate, granularity) + + def get_instance_root_disk_size(self, resource, period, aggregate, + granularity=300): + + return self.statistic_aggregation( + resource, 'instance', 'instance_root_disk_size', period, + aggregate, granularity) diff --git a/watcher/datasources/monasca.py b/watcher/datasources/monasca.py index c64c4fac0..5dbee611e 100644 --- a/watcher/datasources/monasca.py +++ b/watcher/datasources/monasca.py @@ -29,16 +29,16 @@ class MonascaHelper(base.DataSourceBase): NAME = 'monasca' METRIC_MAP = dict(host_cpu_usage='cpu.percent', - instance_cpu_usage='vm.cpu.utilization_perc', - instance_l3_cache_usage=None, + host_ram_usage=None, host_outlet_temp=None, - host_airflow=None, host_inlet_temp=None, + host_airflow=None, host_power=None, + instance_cpu_usage='vm.cpu.utilization_perc', instance_ram_usage=None, instance_ram_allocated=None, + instance_l3_cache_usage=None, instance_root_disk_size=None, - host_memory_usage=None, ) def __init__(self, osc=None): @@ -89,63 +89,27 @@ class MonascaHelper(base.DataSourceBase): # monasca API. pass - def statistics_list(self, meter_name, dimensions, start_time=None, - end_time=None, period=None,): - """List of statistics.""" - start_timestamp, end_timestamp, period = self._format_time_params( - start_time, end_time, period - ) - raw_kwargs = dict( - name=meter_name, - start_time=start_timestamp, - end_time=end_timestamp, - dimensions=dimensions, - ) - - kwargs = {k: v for k, v in raw_kwargs.items() if k and v} - - statistics = self.query_retry( - f=self.monasca.metrics.list_measurements, **kwargs) - - return statistics - - def statistic_aggregation(self, resource_id=None, meter_name=None, - period=300, granularity=300, dimensions=None, - aggregation='avg', group_by='*'): - """Representing a statistic aggregate by operators - - :param resource_id: id of resource to list statistics for. - This param isn't used in Monasca datasource. - :param meter_name: meter names of which we want the statistics. - :param period: Sampling `period`: In seconds. If no period is given, - only one aggregate statistic is returned. If given, a - faceted result will be returned, divided into given - periods. Periods with no data are ignored. - :param granularity: frequency of marking metric point, in seconds. - This param isn't used in Ceilometer datasource. - :param dimensions: dimensions (dict). - :param aggregation: Should be either 'avg', 'count', 'min' or 'max'. - :param group_by: list of columns to group the metrics to be returned. - :return: A list of dict with each dict being a distinct result row - """ - - if dimensions is None: - raise exception.UnsupportedDataSource(datasource='Monasca') - + def statistic_aggregation(self, resource=None, resource_type=None, + meter_name=None, period=300, aggregate='mean', + granularity=300): stop_time = datetime.datetime.utcnow() start_time = stop_time - datetime.timedelta(seconds=(int(period))) - if aggregation == 'mean': - aggregation = 'avg' + meter = self.METRIC_MAP.get(meter_name) + if meter is None: + raise exception.NoSuchMetric() + + if aggregate == 'mean': + aggregate = 'avg' raw_kwargs = dict( - name=meter_name, + name=meter, start_time=start_time.isoformat(), end_time=stop_time.isoformat(), - dimensions=dimensions, + dimensions={'hostname': resource.uuid}, period=period, - statistics=aggregation, - group_by=group_by, + statistics=aggregate, + group_by='*', ) kwargs = {k: v for k, v in raw_kwargs.items() if k and v} @@ -155,67 +119,58 @@ class MonascaHelper(base.DataSourceBase): cpu_usage = None for stat in statistics: - avg_col_idx = stat['columns'].index(aggregation) + avg_col_idx = stat['columns'].index(aggregate) values = [r[avg_col_idx] for r in stat['statistics']] value = float(sum(values)) / len(values) cpu_usage = value return cpu_usage - def get_host_cpu_usage(self, resource_id, period, aggregate, - granularity=None): - metric_name = self.METRIC_MAP.get('host_cpu_usage') - node_uuid = resource_id.split('_')[0] + def get_host_cpu_usage(self, resource, period, + aggregate, granularity=None): return self.statistic_aggregation( - meter_name=metric_name, - dimensions=dict(hostname=node_uuid), - period=period, - aggregation=aggregate - ) + resource, 'compute_node', 'host_cpu_usage', period, aggregate, + granularity) - def get_instance_cpu_usage(self, resource_id, period, aggregate, - granularity=None): - metric_name = self.METRIC_MAP.get('instance_cpu_usage') + def get_host_ram_usage(self, resource, period, + aggregate, granularity=None): + raise NotImplementedError + + def get_host_outlet_temp(self, resource, period, + aggregate, granularity=None): + raise NotImplementedError + + def get_host_inlet_temp(self, resource, period, + aggregate, granularity=None): + raise NotImplementedError + + def get_host_airflow(self, resource, period, + aggregate, granularity=None): + raise NotImplementedError + + def get_host_power(self, resource, period, + aggregate, granularity=None): + raise NotImplementedError + + def get_instance_cpu_usage(self, resource, period, + aggregate, granularity=None): return self.statistic_aggregation( - meter_name=metric_name, - dimensions=dict(resource_id=resource_id), - period=period, - aggregation=aggregate - ) + resource, 'instance', 'instance_cpu_usage', period, aggregate, + granularity) - def get_host_memory_usage(self, resource_id, period, aggregate, - granularity=None): + def get_instance_ram_usage(self, resource, period, + aggregate, granularity=None): raise NotImplementedError - def get_instance_ram_usage(self, resource_id, period, aggregate, - granularity=None): + def get_instance_ram_allocated(self, resource, period, + aggregate, granularity=None): raise NotImplementedError - def get_instance_l3_cache_usage(self, resource_id, period, aggregate, - granularity=None): + def get_instance_l3_cache_usage(self, resource, period, + aggregate, granularity=None): raise NotImplementedError - def get_instance_ram_allocated(self, resource_id, period, aggregate, - granularity=None): - raise NotImplementedError - - def get_instance_root_disk_size(self, resource_id, period, aggregate, - granularity=None): - raise NotImplementedError - - def get_host_outlet_temp(self, resource_id, period, aggregate, - granularity=None): - raise NotImplementedError - - def get_host_inlet_temp(self, resource_id, period, aggregate, - granularity=None): - raise NotImplementedError - - def get_host_airflow(self, resource_id, period, aggregate, - granularity=None): - raise NotImplementedError - - def get_host_power(self, resource_id, period, aggregate, - granularity=None): + def get_instance_root_disk_size(self, resource, period, + aggregate, granularity=None): raise NotImplementedError diff --git a/watcher/decision_engine/model/element/instance.py b/watcher/decision_engine/model/element/instance.py index 3d631c750..3d3ffffd0 100644 --- a/watcher/decision_engine/model/element/instance.py +++ b/watcher/decision_engine/model/element/instance.py @@ -47,7 +47,6 @@ class Instance(compute_resource.ComputeResource): # 'watcher_exclude' property will be set True. "watcher_exclude": wfields.BooleanField(default=False), "state": wfields.StringField(default=InstanceState.ACTIVE.value), - "memory": wfields.NonNegativeIntegerField(), "disk": wfields.IntegerField(), "disk_capacity": wfields.NonNegativeIntegerField(), diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index d108c0a38..7db961a8f 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -48,23 +48,8 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): that live migration is possible on your OpenStack cluster. """ - HOST_CPU_USAGE_METRIC_NAME = 'compute.node.cpu.percent' - INSTANCE_CPU_USAGE_METRIC_NAME = 'cpu_util' - DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage'] - METRIC_NAMES = dict( - ceilometer=dict( - host_cpu_usage='compute.node.cpu.percent', - instance_cpu_usage='cpu_util'), - monasca=dict( - host_cpu_usage='cpu.percent', - instance_cpu_usage='vm.cpu.utilization_perc'), - gnocchi=dict( - host_cpu_usage='compute.node.cpu.percent', - instance_cpu_usage='cpu_util'), - ) - CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state" def __init__(self, config, osc=None): @@ -111,7 +96,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def aggregation_method(self): return self.input_parameters.get( 'aggregation_method', - {"instance": 'mean', "node": 'mean'}) + {"instance": 'mean', "compute_node": 'mean'}) @classmethod def get_display_name(cls): @@ -159,12 +144,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): "type": "string", "default": 'mean' }, - "node": { + "compute_node": { "type": "string", "default": 'mean' }, }, - "default": {"instance": 'mean', "node": 'mean'} + "default": {"instance": 'mean', "compute_node": 'mean'} }, }, } @@ -271,16 +256,15 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): # TODO(jed): take in account weight return (score_cores + score_disk + score_memory) / 3 - def get_node_cpu_usage(self, node): - resource_id = "%s_%s" % (node.uuid, node.hostname) + def get_compute_node_cpu_usage(self, compute_node): return self.datasource_backend.get_host_cpu_usage( - resource_id, self.period, self.aggregation_method['node'], - granularity=self.granularity) + compute_node, self.period, self.aggregation_method['compute_node'], + self.granularity) def get_instance_cpu_usage(self, instance): return self.datasource_backend.get_instance_cpu_usage( - instance.uuid, self.period, self.aggregation_method['instance'], - granularity=self.granularity) + instance, self.period, self.aggregation_method['instance'], + self.granularity) def calculate_score_node(self, node): """Calculate the score that represent the utilization level @@ -289,7 +273,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): :return: Score for the given compute node :rtype: float """ - host_avg_cpu_util = self.get_node_cpu_usage(node) + host_avg_cpu_util = self.get_compute_node_cpu_usage(node) if host_avg_cpu_util is None: resource_id = "%s_%s" % (node.uuid, node.hostname) @@ -297,8 +281,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): "No values returned by %(resource_id)s " "for %(metric_name)s", dict( resource_id=resource_id, - metric_name=self.METRIC_NAMES[ - self.datasource_backend.NAME]['host_cpu_usage'])) + metric_name='host_cpu_usage')) host_avg_cpu_util = 100 total_cores_used = node.vcpus * (host_avg_cpu_util / 100.0) @@ -317,8 +300,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): "No values returned by %(resource_id)s " "for %(metric_name)s", dict( resource_id=instance.uuid, - metric_name=self.METRIC_NAMES[ - self.datasource_backend.NAME]['instance_cpu_usage'])) + metric_name='instance_cpu_usage')) instance_cpu_utilization = 100 total_cores_used = instance.vcpus * (instance_cpu_utilization / 100.0) diff --git a/watcher/decision_engine/strategy/strategies/noisy_neighbor.py b/watcher/decision_engine/strategy/strategies/noisy_neighbor.py index 49568cd78..b9e4c2480 100644 --- a/watcher/decision_engine/strategy/strategies/noisy_neighbor.py +++ b/watcher/decision_engine/strategy/strategies/noisy_neighbor.py @@ -53,14 +53,12 @@ class NoisyNeighbor(base.NoisyNeighborBaseStrategy): DATASOURCE_METRICS = ['instance_l3_cache_usage'] - # The meter to report L3 cache in ceilometer - METER_NAME_L3 = "cpu_l3_cache" DEFAULT_WATCHER_PRIORITY = 5 def __init__(self, config, osc=None): super(NoisyNeighbor, self).__init__(config, osc) - self.meter_name = self.METER_NAME_L3 + self.meter_name = 'instance_l3_cache_usage' @classmethod def get_name(cls): @@ -97,10 +95,11 @@ class NoisyNeighbor(base.NoisyNeighborBaseStrategy): def get_current_and_previous_cache(self, instance): try: curr_cache = self.datasource_backend.get_instance_l3_cache_usage( - instance.uuid, self.period, 'mean', granularity=300) + instance, self.meter_name, self.period, + 'mean', granularity=300) previous_cache = 2 * ( self.datasource_backend.get_instance_l3_cache_usage( - instance.uuid, 2 * self.period, + instance, self.meter_name, 2 * self.period, 'mean', granularity=300)) - curr_cache except Exception as exc: diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index b7dada6b1..2e5a5821e 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -78,13 +78,6 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): DATASOURCE_METRICS = ['host_outlet_temp'] - METRIC_NAMES = dict( - ceilometer=dict( - host_outlet_temp='hardware.ipmi.node.outlet_temperature'), - gnocchi=dict( - host_outlet_temp='hardware.ipmi.node.outlet_temperature'), - ) - def __init__(self, config, osc=None): """Outlet temperature control using live migration @@ -165,14 +158,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): nodes = self.get_available_compute_nodes() hosts_need_release = [] hosts_target = [] - metric_name = self.METRIC_NAMES[ - self.datasource_backend.NAME]['host_outlet_temp'] + metric_name = 'host_outlet_temp' for node in nodes.values(): - resource_id = node.uuid outlet_temp = None outlet_temp = self.datasource_backend.statistic_aggregation( - resource_id=resource_id, + resource=node, + resource_type='compute_node', meter_name=metric_name, period=self.period, granularity=self.granularity, @@ -180,12 +172,12 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): # some hosts may not have outlet temp meters, remove from target if outlet_temp is None: - LOG.warning("%s: no outlet temp data", resource_id) + LOG.warning("%s: no outlet temp data", node.uuid) continue LOG.debug("%(resource)s: outlet temperature %(temp)f", - {'resource': resource_id, 'temp': outlet_temp}) - instance_data = {'node': node, 'outlet_temp': outlet_temp} + {'resource': node.uuid, 'temp': outlet_temp}) + instance_data = {'compute_node': node, 'outlet_temp': outlet_temp} if outlet_temp >= self.threshold: # mark the node to release resources hosts_need_release.append(instance_data) @@ -196,7 +188,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def choose_instance_to_migrate(self, hosts): """Pick up an active instance to migrate from provided hosts""" for instance_data in hosts: - mig_source_node = instance_data['node'] + mig_source_node = instance_data['compute_node'] instances_of_src = self.compute_model.get_node_instances( mig_source_node) for instance in instances_of_src: @@ -228,7 +220,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): # filter nodes without enough resource dest_servers = [] for instance_data in hosts: - host = instance_data['node'] + host = instance_data['compute_node'] # available cores_used, mem_used, disk_used = self.calc_used_resource(host) cores_available = host.vcpus - cores_used @@ -284,7 +276,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): dest_servers = sorted(dest_servers, key=lambda x: (x["outlet_temp"])) # always use the host with lowerest outlet temperature - mig_destination_node = dest_servers[0]['node'] + mig_destination_node = dest_servers[0]['compute_node'] # generate solution to migrate the instance to the dest server, if self.compute_model.migrate_instance( instance_src, mig_source_node, mig_destination_node): diff --git a/watcher/decision_engine/strategy/strategies/uniform_airflow.py b/watcher/decision_engine/strategy/strategies/uniform_airflow.py index 4bd51b057..e0b0e2a64 100644 --- a/watcher/decision_engine/strategy/strategies/uniform_airflow.py +++ b/watcher/decision_engine/strategy/strategies/uniform_airflow.py @@ -57,25 +57,6 @@ class UniformAirflow(base.BaseStrategy): DATASOURCE_METRICS = ['host_airflow', 'host_inlet_temp', 'host_power'] - METRIC_NAMES = dict( - ceilometer=dict( - # The meter to report Airflow of physical server in ceilometer - host_airflow='hardware.ipmi.node.airflow', - # The meter to report inlet temperature of physical server - # in ceilometer - host_inlet_temp='hardware.ipmi.node.temperature', - # The meter to report system power of physical server in ceilometer - host_power='hardware.ipmi.node.power'), - gnocchi=dict( - # The meter to report Airflow of physical server in gnocchi - host_airflow='hardware.ipmi.node.airflow', - # The meter to report inlet temperature of physical server - # in gnocchi - host_inlet_temp='hardware.ipmi.node.temperature', - # The meter to report system power of physical server in gnocchi - host_power='hardware.ipmi.node.power'), - ) - def __init__(self, config, osc=None): """Using live migration @@ -176,18 +157,20 @@ class UniformAirflow(base.BaseStrategy): source_instances = self.compute_model.get_node_instances( source_node) if source_instances: - inlet_t = self.datasource_backend.statistic_aggregation( - resource_id=source_node.uuid, - meter_name=self.meter_name_inlet_t, + inlet_temp = self.datasource_backend.statistic_aggregation( + resource=source_node, + resource_type='instance', + meter_name='host_inlet_temp', period=self._period, granularity=self.granularity) power = self.datasource_backend.statistic_aggregation( - resource_id=source_node.uuid, - meter_name=self.meter_name_power, + resource=source_node, + resource_type='instance', + meter_name='host_power', period=self._period, granularity=self.granularity) if (power < self.threshold_power and - inlet_t < self.threshold_inlet_t): + inlet_temp < self.threshold_inlet_t): # hardware issue, migrate all instances from this node for instance in source_instances: instances_tobe_migrate.append(instance) @@ -265,19 +248,19 @@ class UniformAirflow(base.BaseStrategy): airflow = None node = self.compute_model.get_node_by_uuid( node_id) - resource_id = node.uuid airflow = self.datasource_backend.statistic_aggregation( - resource_id=resource_id, - meter_name=self.meter_name_airflow, + resource=node, + resource_type='compute_node', + meter_name='host_airflow', period=self._period, granularity=self.granularity) # some hosts may not have airflow meter, remove from target if airflow is None: - LOG.warning("%s: no airflow data", resource_id) + LOG.warning("%s: no airflow data", node.uuid) continue LOG.debug("%(resource)s: airflow %(airflow)f", - {'resource': resource_id, 'airflow': airflow}) + {'resource': node, 'airflow': airflow}) nodemap = {'node': node, 'airflow': airflow} if airflow >= self.threshold_airflow: # mark the node to release resources @@ -288,12 +271,9 @@ class UniformAirflow(base.BaseStrategy): def pre_execute(self): self._pre_execute() - self.meter_name_airflow = self.METRIC_NAMES[ - self.datasource_backend.NAME]['host_airflow'] - self.meter_name_inlet_t = self.METRIC_NAMES[ - self.datasource_backend.NAME]['host_inlet_temp'] - self.meter_name_power = self.METRIC_NAMES[ - self.datasource_backend.NAME]['host_power'] + self.meter_name_airflow = 'host_airflow' + self.meter_name_inlet_t = 'host_inlet_temp' + self.meter_name_power = 'host_power' self.threshold_airflow = self.input_parameters.threshold_airflow self.threshold_inlet_t = self.input_parameters.threshold_inlet_t diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py index b3296dffc..7b2777f54 100644 --- a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -65,36 +65,17 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): an active compute node to any other active compute node. """ - HOST_CPU_USAGE_METRIC_NAME = 'compute.node.cpu.percent' - INSTANCE_CPU_USAGE_METRIC_NAME = 'cpu_util' - AGGREGATION = 'mean' - + AGGREGATE = 'mean' DATASOURCE_METRICS = ['instance_ram_allocated', 'instance_cpu_usage', 'instance_ram_usage', 'instance_root_disk_size'] - METRIC_NAMES = dict( - ceilometer=dict( - cpu_util_metric='cpu_util', - ram_util_metric='memory.resident', - ram_alloc_metric='memory', - disk_alloc_metric='disk.root.size'), - gnocchi=dict( - cpu_util_metric='cpu_util', - ram_util_metric='memory.resident', - ram_alloc_metric='memory', - disk_alloc_metric='disk.root.size'), - ) - MIGRATION = "migrate" CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state" def __init__(self, config, osc=None): super(VMWorkloadConsolidation, self).__init__(config, osc) - self._ceilometer = None - self._gnocchi = None self.number_of_migrations = 0 self.number_of_released_nodes = 0 - # self.ceilometer_instance_data_cache = dict() self.datasource_instance_data_cache = dict() @classmethod @@ -272,28 +253,20 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): return self.datasource_instance_data_cache.get(instance.uuid) instance_cpu_util = self.datasource_backend.get_instance_cpu_usage( - instance.uuid, - self.period, - self.AGGREGATION, - granularity=self.granularity) + resource=instance, period=self.period, + aggregate=self.AGGREGATE, granularity=self.granularity) instance_ram_util = self.datasource_backend.get_instance_ram_usage( - instance.uuid, - self.period, - self.AGGREGATION, - granularity=self.granularity) + resource=instance, period=self.period, + aggregate=self.AGGREGATE, granularity=self.granularity) if not instance_ram_util: instance_ram_util = ( self.datasource_backend.get_instance_ram_allocated( - instance.uuid, - self.period, - self.AGGREGATION, - granularity=self.granularity)) + resource=instance, period=self.period, + aggregate=self.AGGREGATE, granularity=self.granularity)) instance_disk_util = ( self.datasource_backend.get_instance_root_disk_size( - instance.uuid, - self.period, - self.AGGREGATION, - granularity=self.granularity)) + resource=instance, period=self.period, + aggregate=self.AGGREGATE, granularity=self.granularity)) if instance_cpu_util: total_cpu_utilization = ( diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index 814632507..bc3b6bffe 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -45,7 +45,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): * Hardware: compute node should use the same physical CPUs/RAMs * Software: Ceilometer component ceilometer-agent-compute running in each compute node, and Ceilometer API can report such telemetry - "cpu_util" and "memory.resident" successfully. + "instance_cpu_usage" and "instance_ram_usage" successfully. * You must have at least 2 physical compute nodes to run this strategy. *Limitations* @@ -59,11 +59,9 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): # The meter to report CPU utilization % of VM in ceilometer # Unit: %, value range is [0 , 100] - CPU_METER_NAME = "cpu_util" # The meter to report memory resident of VM in ceilometer # Unit: MB - MEM_METER_NAME = "memory.resident" DATASOURCE_METRICS = ['instance_cpu_usage', 'instance_ram_usage'] @@ -105,8 +103,8 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): "description": "Workload balance based on metrics: " "cpu or ram utilization", "type": "string", - "choice": ["cpu_util", "memory.resident"], - "default": "cpu_util" + "choice": ["instance_cpu_usage", "instance_ram_usage"], + "default": "instance_cpu_usage" }, "threshold": { "description": "workload threshold for migration", @@ -155,7 +153,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): :param workload_cache: the map contains instance to workload mapping """ for instance_data in hosts: - source_node = instance_data['node'] + source_node = instance_data['compute_node'] source_instances = self.compute_model.get_node_instances( source_node) if source_instances: @@ -188,7 +186,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): self.compute_model.get_instance_by_uuid( instance_id)) else: - LOG.info("VM not found from node: %s", + LOG.info("VM not found from compute_node: %s", source_node.uuid) def filter_destination_hosts(self, hosts, instance_to_migrate, @@ -202,7 +200,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): destination_hosts = [] src_instance_workload = workload_cache[instance_to_migrate.uuid] for instance_data in hosts: - host = instance_data['node'] + host = instance_data['compute_node'] workload = instance_data['workload'] # calculate the available resources cores_used, mem_used, disk_used = self.calculate_used_resource( @@ -213,11 +211,11 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): if (cores_available >= required_cores and mem_available >= required_mem and disk_available >= required_disk): - if (self._meter == self.CPU_METER_NAME and + if (self._meter == 'instance_cpu_usage' and ((src_instance_workload + workload) < self.threshold / 100 * host.vcpus)): destination_hosts.append(instance_data) - if (self._meter == self.MEM_METER_NAME and + if (self._meter == 'instance_ram_usage' and ((src_instance_workload + workload) < self.threshold / 100 * host.memory)): destination_hosts.append(instance_data) @@ -225,7 +223,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): return destination_hosts def group_hosts_by_cpu_or_ram_util(self): - """Calculate the workloads of each node + """Calculate the workloads of each compute_node try to find out the nodes which have reached threshold and the nodes which are under threshold. @@ -249,9 +247,8 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): util = None try: util = self.datasource_backend.statistic_aggregation( - instance.uuid, self._meter, self._period, - self._granularity, aggregation='mean', - dimensions=dict(resource_id=instance.uuid)) + instance, 'instance', self._meter, self._period, + 'mean', self._granularity) except Exception as exc: LOG.exception(exc) LOG.error("Can not get %s from %s", self._meter, @@ -261,7 +258,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): LOG.debug("Instance (%s): %s is None", instance.uuid, self._meter) continue - if self._meter == self.CPU_METER_NAME: + if self._meter == 'instance_cpu_usage': workload_cache[instance.uuid] = (util * instance.vcpus / 100) else: @@ -271,13 +268,13 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): util) cluster_workload += node_workload - if self._meter == self.CPU_METER_NAME: + if self._meter == 'instance_cpu_usage': node_util = node_workload / node.vcpus * 100 else: node_util = node_workload / node.memory * 100 instance_data = { - 'node': node, self._meter: node_util, + 'compute_node': node, self._meter: node_util, 'workload': node_workload} if node_util >= self.threshold: # mark the node to release resources @@ -340,7 +337,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): destination_hosts = sorted(destination_hosts, key=lambda x: (x[self._meter])) # always use the host with lowerest CPU utilization - mig_destination_node = destination_hosts[0]['node'] + mig_destination_node = destination_hosts[0]['compute_node'] # generate solution to migrate the instance to the dest server, if self.compute_model.migrate_instance( instance_src, source_node, mig_destination_node): diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index 37a5fe29e..ece3686d1 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -21,7 +21,6 @@ import copy import itertools import math import random -import re import oslo_cache from oslo_config import cfg @@ -61,7 +60,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): MEMOIZE = _set_memoize(CONF) DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage', - 'instance_ram_usage', 'host_memory_usage'] + 'instance_ram_usage', 'host_ram_usage'] def __init__(self, config, osc=None): """Workload Stabilization control using live migration @@ -109,27 +108,28 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): "type": "array", "items": { "type": "string", - "enum": ["cpu_util", "memory.resident"] + "enum": ["instance_cpu_usage", "instance_ram_usage"] }, - "default": ["cpu_util"] + "default": ["instance_cpu_usage"] }, "thresholds": { "description": "Dict where key is a metric and value " "is a trigger value.", "type": "object", "properties": { - "cpu_util": { + "instance_cpu_usage": { "type": "number", "minimum": 0, "maximum": 1 }, - "memory.resident": { + "instance_ram_usage": { "type": "number", "minimum": 0, "maximum": 1 } }, - "default": {"cpu_util": 0.1, "memory.resident": 0.1} + "default": {"instance_cpu_usage": 0.1, + "instance_ram_usage": 0.1} }, "weights": { "description": "These weights used to calculate " @@ -137,26 +137,26 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): " contains meter name and _weight suffix.", "type": "object", "properties": { - "cpu_util_weight": { + "instance_cpu_usage_weight": { "type": "number", "minimum": 0, "maximum": 1 }, - "memory.resident_weight": { + "instance_ram_usage_weight": { "type": "number", "minimum": 0, "maximum": 1 } }, - "default": {"cpu_util_weight": 1.0, - "memory.resident_weight": 1.0} + "default": {"instance_cpu_usage_weight": 1.0, + "instance_ram_usage_weight": 1.0} }, "instance_metrics": { "description": "Mapping to get hardware statistics using" " instance metrics", "type": "object", - "default": {"cpu_util": "compute.node.cpu.percent", - "memory.resident": "hardware.memory.used"} + "default": {"instance_cpu_usage": "host_cpu_usage", + "instance_ram_usage": "host_ram_usage"} }, "host_choice": { "description": "Method of host's choice. There are cycle," @@ -189,12 +189,12 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): "type": "integer", "minimum": 0 }, - "node": { + "compute_node": { "type": "integer", "minimum": 0 }, }, - "default": {"instance": 720, "node": 600} + "default": {"instance": 720, "compute_node": 600} }, "aggregation_method": { "description": "Function used to aggregate multiple " @@ -209,12 +209,12 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): "type": "string", "default": 'mean' }, - "node": { + "compute_node": { "type": "string", "default": 'mean' }, }, - "default": {"instance": 'mean', "node": 'mean'} + "default": {"instance": 'mean', "compute_node": 'mean'} }, "granularity": { "description": "The time between two measures in an " @@ -234,7 +234,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): :param host_vcpus: int :return: float value """ - return (instance_load['cpu_util'] * + return (instance_load['instance_cpu_usage'] * (instance_load['vcpus'] / float(host_vcpus))) @MEMOIZE @@ -248,17 +248,15 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): instance_load = {'uuid': instance.uuid, 'vcpus': instance.vcpus} for meter in self.metrics: avg_meter = self.datasource_backend.statistic_aggregation( - instance.uuid, meter, self.periods['instance'], - self.granularity, - aggregation=self.aggregation_method['instance']) + instance, 'instance', meter, self.periods['instance'], + self.aggregation_method['instance'], self.granularity) if avg_meter is None: LOG.warning( "No values returned by %(resource_id)s " "for %(metric_name)s", dict( resource_id=instance.uuid, metric_name=meter)) return - # cpu_util has been deprecated since Stein. - if meter == 'cpu_util': + if meter == 'instance_cpu_usage': avg_meter /= float(100) LOG.debug('Load of %(metric)s for %(instance)s is %(value)s', {'metric': meter, @@ -270,9 +268,10 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): def normalize_hosts_load(self, hosts): normalized_hosts = copy.deepcopy(hosts) for host in normalized_hosts: - if 'memory.resident' in normalized_hosts[host]: + if 'instance_ram_usage' in normalized_hosts[host]: node = self.compute_model.get_node_by_uuid(host) - normalized_hosts[host]['memory.resident'] /= float(node.memory) + normalized_hosts[host]['instance_ram_usage'] \ + /= float(node.memory) return normalized_hosts @@ -290,29 +289,21 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): hosts_load[node_id]['vcpus'] = node.vcpus LOG.debug('Getting load for %s', node_id) for metric in self.metrics: - resource_id = '' avg_meter = None meter_name = self.instance_metrics[metric] - if re.match('^compute.node', meter_name) is not None: - resource_id = "%s_%s" % (node.uuid, node.hostname) - else: - resource_id = node_id avg_meter = self.datasource_backend.statistic_aggregation( - resource_id, self.instance_metrics[metric], - self.periods['node'], self.granularity, - aggregation=self.aggregation_method['node']) + node, 'compute_node', self.instance_metrics[metric], + self.periods['compute_node'], + self.aggregation_method['compute_node'], self.granularity) if avg_meter is None: LOG.warning('No values returned by node %s for %s', node_id, meter_name) del hosts_load[node_id] break else: - if meter_name == 'hardware.memory.used': + if meter_name == 'host_ram_usage': avg_meter /= oslo_utils.units.Ki - if meter_name == 'compute.node.cpu.percent': - avg_meter /= 100 - # hardware.cpu.util has been deprecated since Stein. - if meter_name == 'hardware.cpu.util': + if meter_name == 'host_cpu_usage': avg_meter /= 100 LOG.debug('Load of %(metric)s for %(node)s is %(value)s', {'metric': metric, @@ -369,7 +360,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): s_host_vcpus = new_hosts[src_node.uuid]['vcpus'] d_host_vcpus = new_hosts[dst_node.uuid]['vcpus'] for metric in self.metrics: - if metric == 'cpu_util': + if metric == 'instance_cpu_usage': new_hosts[src_node.uuid][metric] -= ( self.transform_instance_cpu(instance_load, s_host_vcpus)) new_hosts[dst_node.uuid][metric] += ( diff --git a/watcher/tests/datasources/test_ceilometer_helper.py b/watcher/tests/datasources/test_ceilometer_helper.py index 7251c1f0c..75cbc1b41 100644 --- a/watcher/tests/datasources/test_ceilometer_helper.py +++ b/watcher/tests/datasources/test_ceilometer_helper.py @@ -54,7 +54,6 @@ class TestCeilometerHelper(base.BaseTestCase): self.assertEqual(expected, query) def test_statistic_aggregation(self, mock_ceilometer): - cm = ceilometer_helper.CeilometerHelper() ceilometer = mock.MagicMock() statistic = mock.MagicMock() expected_result = 100 @@ -63,113 +62,73 @@ class TestCeilometerHelper(base.BaseTestCase): mock_ceilometer.return_value = ceilometer cm = ceilometer_helper.CeilometerHelper() val = cm.statistic_aggregation( - resource_id="INSTANCE_ID", - meter_name="cpu_util", + resource=mock.Mock(id="INSTANCE_ID"), + resource_type='instance', + meter_name="instance_cpu_usage", period="7300", granularity=None ) self.assertEqual(expected_result, val) - def test_get_last_sample(self, mock_ceilometer): - ceilometer = mock.MagicMock() - statistic = mock.MagicMock() - expected_result = 100 - statistic[-1]._info = {'counter_volume': expected_result} - ceilometer.samples.list.return_value = statistic - mock_ceilometer.return_value = ceilometer - cm = ceilometer_helper.CeilometerHelper() - val = cm.get_last_sample_value( - resource_id="id", - meter_name="compute.node.percent" - ) - self.assertEqual(expected_result, val) - - def test_get_last_sample_none(self, mock_ceilometer): - ceilometer = mock.MagicMock() - expected = [] - ceilometer.samples.list.return_value = expected - mock_ceilometer.return_value = ceilometer - cm = ceilometer_helper.CeilometerHelper() - val = cm.get_last_sample_values( - resource_id="id", - meter_name="compute.node.percent" - ) - self.assertEqual(expected, val) - - def test_statistic_list(self, mock_ceilometer): - ceilometer = mock.MagicMock() - expected_value = [] - ceilometer.statistics.list.return_value = expected_value - mock_ceilometer.return_value = ceilometer - cm = ceilometer_helper.CeilometerHelper() - val = cm.statistic_list(meter_name="cpu_util") - self.assertEqual(expected_value, val) - def test_get_host_cpu_usage(self, mock_ceilometer): self.helper.get_host_cpu_usage('compute1', 600, 'mean') self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_cpu_usage'], 600, None, - aggregation='mean') + 'compute1', 'compute_node', 'host_cpu_usage', 600, 'mean', None) - def test_get_instance_cpu_usage(self, mock_ceilometer): - self.helper.get_instance_cpu_usage('compute1', 600, 'mean') + def test_get_host_ram_usage(self, mock_ceilometer): + self.helper.get_host_ram_usage('compute1', 600, 'mean') self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_cpu_usage'], 600, - None, aggregation='mean') + 'compute1', 'compute_node', 'host_ram_usage', 600, 'mean', None) - def test_get_host_memory_usage(self, mock_ceilometer): - self.helper.get_host_memory_usage('compute1', 600, 'mean') - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_memory_usage'], 600, None, - aggregation='mean') - - def test_get_instance_memory_usage(self, mock_ceilometer): - self.helper.get_instance_ram_usage('compute1', 600, 'mean') - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_ram_usage'], 600, - None, aggregation='mean') - - def test_get_instance_l3_cache_usage(self, mock_ceilometer): - self.helper.get_instance_l3_cache_usage('compute1', 600, 'mean') - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_l3_cache_usage'], 600, - None, aggregation='mean') - - def test_get_instance_ram_allocated(self, mock_ceilometer): - self.helper.get_instance_ram_allocated('compute1', 600, 'mean') - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_ram_allocated'], 600, - None, aggregation='mean') - - def test_get_instance_root_disk_allocated(self, mock_ceilometer): - self.helper.get_instance_root_disk_size('compute1', 600, 'mean') - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_root_disk_size'], 600, - None, aggregation='mean') - - def test_get_host_outlet_temperature(self, mock_ceilometer): + def test_get_host_outlet_temp(self, mock_ceilometer): self.helper.get_host_outlet_temp('compute1', 600, 'mean') self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_outlet_temp'], 600, None, - aggregation='mean') + 'compute1', 'compute_node', 'host_outlet_temp', 600, 'mean', None) - def test_get_host_inlet_temperature(self, mock_ceilometer): + def test_get_host_inlet_temp(self, mock_ceilometer): self.helper.get_host_inlet_temp('compute1', 600, 'mean') self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_inlet_temp'], 600, None, - aggregation='mean') + 'compute1', 'compute_node', 'host_inlet_temp', 600, 'mean', None) def test_get_host_airflow(self, mock_ceilometer): self.helper.get_host_airflow('compute1', 600, 'mean') self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_airflow'], 600, None, - aggregation='mean') + 'compute1', 'compute_node', 'host_airflow', 600, 'mean', None) def test_get_host_power(self, mock_ceilometer): self.helper.get_host_power('compute1', 600, 'mean') self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_power'], 600, None, - aggregation='mean') + 'compute1', 'compute_node', 'host_power', 600, 'mean', None) + + def test_get_instance_cpu_usage(self, mock_ceilometer): + self.helper.get_instance_cpu_usage('compute1', 600, 'mean') + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_cpu_usage', 600, 'mean', + None) + + def test_get_instance_ram_usage(self, mock_ceilometer): + self.helper.get_instance_ram_usage('compute1', 600, 'mean') + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_ram_usage', 600, 'mean', + None) + + def test_get_instance_ram_allocated(self, mock_ceilometer): + self.helper.get_instance_ram_allocated('compute1', 600, 'mean') + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_ram_allocated', 600, 'mean', + None) + + def test_get_instance_l3_cache_usage(self, mock_ceilometer): + self.helper.get_instance_l3_cache_usage('compute1', 600, 'mean') + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_l3_cache_usage', 600, 'mean', + None) + + def test_get_instance_root_disk_size(self, mock_ceilometer): + self.helper.get_instance_root_disk_size('compute1', 600, 'mean') + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_root_disk_size', 600, 'mean', + None) def test_check_availability(self, mock_ceilometer): ceilometer = mock.MagicMock() diff --git a/watcher/tests/datasources/test_gnocchi_helper.py b/watcher/tests/datasources/test_gnocchi_helper.py index c0ccbfc4d..370a1cbf3 100644 --- a/watcher/tests/datasources/test_gnocchi_helper.py +++ b/watcher/tests/datasources/test_gnocchi_helper.py @@ -48,85 +48,74 @@ class TestGnocchiHelper(base.BaseTestCase): helper = gnocchi_helper.GnocchiHelper() result = helper.statistic_aggregation( - resource_id='16a86790-327a-45f9-bc82-45839f062fdc', - meter_name='cpu_util', + resource=mock.Mock(id='16a86790-327a-45f9-bc82-45839f062fdc'), + resource_type='instance', + meter_name='instance_cpu_usage', period=300, granularity=360, - dimensions=None, - aggregation='mean', - group_by='*' + aggregate='mean', ) self.assertEqual(expected_result, result) def test_get_host_cpu_usage(self, mock_gnocchi): - self.helper.get_host_cpu_usage('compute1', 600, 'mean', - granularity=300) + self.helper.get_host_cpu_usage('compute1', 600, 'mean', 300) self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_cpu_usage'], 600, - 300, aggregation='mean') + 'compute1', 'compute_node', 'host_cpu_usage', 600, 'mean', + 300) - def test_get_instance_cpu_usage(self, mock_gnocchi): - self.helper.get_instance_cpu_usage('compute1', 600, 'mean', - granularity=300) + def test_get_host_ram_usage(self, mock_gnocchi): + self.helper.get_host_ram_usage('compute1', 600, 'mean', 300) self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_cpu_usage'], 600, - 300, aggregation='mean') - - def test_get_host_memory_usage(self, mock_gnocchi): - self.helper.get_host_memory_usage('compute1', 600, 'mean', - granularity=300) - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_memory_usage'], 600, - 300, aggregation='mean') - - def test_get_instance_memory_usage(self, mock_gnocchi): - self.helper.get_instance_ram_usage('compute1', 600, 'mean', - granularity=300) - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_ram_usage'], 600, - 300, aggregation='mean') - - def test_get_instance_ram_allocated(self, mock_gnocchi): - self.helper.get_instance_ram_allocated('compute1', 600, 'mean', - granularity=300) - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_ram_allocated'], 600, - 300, aggregation='mean') - - def test_get_instance_root_disk_allocated(self, mock_gnocchi): - self.helper.get_instance_root_disk_size('compute1', 600, 'mean', - granularity=300) - self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['instance_root_disk_size'], 600, - 300, aggregation='mean') + 'compute1', 'compute_node', 'host_ram_usage', 600, 'mean', + 300) def test_get_host_outlet_temperature(self, mock_gnocchi): - self.helper.get_host_outlet_temp('compute1', 600, 'mean', - granularity=300) + self.helper.get_host_outlet_temp('compute1', 600, 'mean', 300) self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_outlet_temp'], 600, - 300, aggregation='mean') + 'compute1', 'compute_node', 'host_outlet_temp', 600, 'mean', + 300) def test_get_host_inlet_temperature(self, mock_gnocchi): - self.helper.get_host_inlet_temp('compute1', 600, 'mean', - granularity=300) + self.helper.get_host_inlet_temp('compute1', 600, 'mean', 300) self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_inlet_temp'], 600, - 300, aggregation='mean') + 'compute1', 'compute_node', 'host_inlet_temp', 600, 'mean', + 300) def test_get_host_airflow(self, mock_gnocchi): - self.helper.get_host_airflow('compute1', 600, 'mean', - granularity=300) + self.helper.get_host_airflow('compute1', 600, 'mean', 300) self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_airflow'], 600, - 300, aggregation='mean') + 'compute1', 'compute_node', 'host_airflow', 600, 'mean', + 300) def test_get_host_power(self, mock_gnocchi): - self.helper.get_host_power('compute1', 600, 'mean', - granularity=300) + self.helper.get_host_power('compute1', 600, 'mean', 300) self.mock_aggregation.assert_called_once_with( - 'compute1', self.helper.METRIC_MAP['host_power'], 600, - 300, aggregation='mean') + 'compute1', 'compute_node', 'host_power', 600, 'mean', + 300) + + def test_get_instance_cpu_usage(self, mock_gnocchi): + self.helper.get_instance_cpu_usage('compute1', 600, 'mean', 300) + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_cpu_usage', 600, 'mean', + 300) + + def test_get_instance_memory_usage(self, mock_gnocchi): + self.helper.get_instance_ram_usage('compute1', 600, 'mean', 300) + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_ram_usage', 600, 'mean', + 300) + + def test_get_instance_ram_allocated(self, mock_gnocchi): + self.helper.get_instance_ram_allocated('compute1', 600, 'mean', 300) + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_ram_allocated', 600, 'mean', + 300) + + def test_get_instance_root_disk_allocated(self, mock_gnocchi): + self.helper.get_instance_root_disk_size('compute1', 600, 'mean', 300) + self.mock_aggregation.assert_called_once_with( + 'compute1', 'instance', 'instance_root_disk_size', 600, 'mean', + 300) def test_gnocchi_check_availability(self, mock_gnocchi): gnocchi = mock.MagicMock() diff --git a/watcher/tests/datasources/test_monasca_helper.py b/watcher/tests/datasources/test_monasca_helper.py index 0a1547e9f..c10f5db74 100644 --- a/watcher/tests/datasources/test_monasca_helper.py +++ b/watcher/tests/datasources/test_monasca_helper.py @@ -56,13 +56,12 @@ class TestMonascaHelper(base.BaseTestCase): helper = monasca_helper.MonascaHelper() result = helper.statistic_aggregation( - resource_id=None, - meter_name='cpu.percent', + resource=mock.Mock(id='NODE_UUID'), + resource_type='compute_node', + meter_name='host_cpu_usage', period=7200, granularity=300, - dimensions={'hostname': 'NODE_UUID'}, - aggregation='avg', - group_by='*', + aggregate='mean', ) self.assertEqual(0.6, result) @@ -81,55 +80,14 @@ class TestMonascaHelper(base.BaseTestCase): helper = monasca_helper.MonascaHelper() self.assertEqual('not available', helper.check_availability()) - def test_monasca_statistic_list(self, mock_monasca): - monasca = mock.MagicMock() - expected_result = [{ - 'columns': ['timestamp', 'value', 'value_meta'], - 'dimensions': { - 'hostname': 'rdev-indeedsrv001', - 'service': 'monasca'}, - 'id': '0', - 'measurements': [ - ['2016-07-29T12:54:06.000Z', 0.9, {}], - ['2016-07-29T12:54:36.000Z', 0.9, {}], - ['2016-07-29T12:55:06.000Z', 0.9, {}], - ['2016-07-29T12:55:36.000Z', 0.8, {}]], - 'name': 'cpu.percent'}] - - monasca.metrics.list_measurements.return_value = expected_result - mock_monasca.return_value = monasca - helper = monasca_helper.MonascaHelper() - val = helper.statistics_list(meter_name="cpu.percent", dimensions={}) - self.assertEqual(expected_result, val) - - def test_monasca_statistic_list_query_retry(self, mock_monasca): - monasca = mock.MagicMock() - expected_result = [{ - 'columns': ['timestamp', 'value', 'value_meta'], - 'dimensions': { - 'hostname': 'rdev-indeedsrv001', - 'service': 'monasca'}, - 'id': '0', - 'measurements': [ - ['2016-07-29T12:54:06.000Z', 0.9, {}], - ['2016-07-29T12:54:36.000Z', 0.9, {}], - ['2016-07-29T12:55:06.000Z', 0.9, {}], - ['2016-07-29T12:55:36.000Z', 0.8, {}]], - 'name': 'cpu.percent'}] - - monasca.metrics.list_measurements.side_effect = [expected_result] - mock_monasca.return_value = monasca - helper = monasca_helper.MonascaHelper() - val = helper.statistics_list(meter_name="cpu.percent", dimensions={}) - self.assertEqual(expected_result, val) - def test_get_host_cpu_usage(self, mock_monasca): - node = "compute1_compute1" self.mock_aggregation.return_value = 0.6 + node = mock.Mock(id='compute1') cpu_usage = self.helper.get_host_cpu_usage(node, 600, 'mean') self.assertEqual(0.6, cpu_usage) def test_get_instance_cpu_usage(self, mock_monasca): self.mock_aggregation.return_value = 0.6 - cpu_usage = self.helper.get_instance_cpu_usage('vm1', 600, 'mean') + node = mock.Mock(id='vm1') + cpu_usage = self.helper.get_instance_cpu_usage(node, 600, 'mean') self.assertEqual(0.6, cpu_usage) diff --git a/watcher/tests/decision_engine/model/ceilometer_metrics.py b/watcher/tests/decision_engine/model/ceilometer_metrics.py index 64aeb80bd..1db67d124 100644 --- a/watcher/tests/decision_engine/model/ceilometer_metrics.py +++ b/watcher/tests/decision_engine/model/ceilometer_metrics.py @@ -28,64 +28,74 @@ class FakeCeilometerMetrics(object): def empty_one_metric(self, emptytype): self.emptytype = emptytype - def mock_get_statistics(self, resource_id=None, meter_name=None, - period=None, granularity=None, dimensions=None, - aggregation='avg', group_by='*'): + def mock_get_statistics(self, resource=None, resource_type=None, + meter_name=None, period=None, aggregate='mean', + granularity=None): result = 0 - if meter_name == "hardware.cpu.util": - result = self.get_usage_node_cpu(resource_id) - elif meter_name == "compute.node.cpu.percent": - result = self.get_usage_node_cpu(resource_id) - elif meter_name == "hardware.memory.used": - result = self.get_usage_node_ram(resource_id) - elif meter_name == "cpu_util": - result = self.get_average_usage_instance_cpu(resource_id) - elif meter_name == "memory.resident": - result = self.get_average_usage_instance_memory(resource_id) - elif meter_name == "hardware.ipmi.node.outlet_temperature": - result = self.get_average_outlet_temperature(resource_id) - elif meter_name == "hardware.ipmi.node.airflow": - result = self.get_average_airflow(resource_id) - elif meter_name == "hardware.ipmi.node.temperature": - result = self.get_average_inlet_t(resource_id) - elif meter_name == "hardware.ipmi.node.power": - result = self.get_average_power(resource_id) + if meter_name == 'host_cpu_usage': + result = self.get_usage_compute_node_cpu(resource) + elif meter_name == 'host_ram_usage': + result = self.get_usage_compute_node_ram(resource) + elif meter_name == 'host_outlet_temp': + result = self.get_average_outlet_temp(resource) + elif meter_name == 'host_inlet_temp': + result = self.get_average_inlet_temp(resource) + elif meter_name == 'host_airflow': + result = self.get_average_airflow(resource) + elif meter_name == 'host_power': + result = self.get_average_power(resource) + elif meter_name == 'instance_cpu_usage': + result = self.get_average_usage_instance_cpu(resource) + elif meter_name == 'instance_ram_usage': + result = self.get_average_usage_instance_memory(resource) return result - def mock_get_statistics_wb(self, resource_id, meter_name, period, - granularity, dimensions=None, - aggregation='avg', group_by='*'): - result = 0.0 - if meter_name == "cpu_util": - result = self.get_average_usage_instance_cpu_wb(resource_id) - elif meter_name == "memory.resident": - result = self.get_average_usage_instance_memory_wb(resource_id) - return result + def mock_get_statistics_nn(self, resource=None, meter_name=None, + period=None, aggregate='mean', granularity=300): + """Statistics for noisy neighbor strategy + + Signature should match DataSourceBase.get_instance_l3_cache_usage + """ - def mock_get_statistics_nn(self, resource_id, period, - aggregation, granularity=300): result = 0.0 if period == 100: - result = self.get_average_l3_cache_current(resource_id) + result = self.get_average_l3_cache_current(resource) if period == 200: - result = self.get_average_l3_cache_previous(resource_id) + result = self.get_average_l3_cache_previous(resource) + return result + + def mock_get_statistics_wb(self, resource=None, resource_type=None, + meter_name=None, period=None, aggregate='mean', + granularity=None): + """Statistics for workload balance strategy""" + + result = 0.0 + if meter_name == 'instance_cpu_usage': + result = self.get_average_usage_instance_cpu_wb(resource) + elif meter_name == 'instance_ram_usage': + result = self.get_average_usage_instance_memory_wb(resource) return result @staticmethod - def get_average_l3_cache_current(uuid): + def get_average_l3_cache_current(resource): """The average l3 cache used by instance""" + + uuid = resource.uuid + mock = {} mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 35 * oslo_utils.units.Ki mock['cae81432-1631-4d4e-b29c-6f3acdcde906'] = 30 * oslo_utils.units.Ki mock['INSTANCE_3'] = 40 * oslo_utils.units.Ki mock['INSTANCE_4'] = 35 * oslo_utils.units.Ki - if uuid not in mock.keys(): - mock[uuid] = 25 * oslo_utils.units.Ki + return mock[str(uuid)] @staticmethod - def get_average_l3_cache_previous(uuid): + def get_average_l3_cache_previous(resource): """The average l3 cache used by instance""" + + uuid = resource.uuid + mock = {} mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 34.5 * ( oslo_utils.units.Ki) @@ -93,13 +103,14 @@ class FakeCeilometerMetrics(object): oslo_utils.units.Ki) mock['INSTANCE_3'] = 60 * oslo_utils.units.Ki mock['INSTANCE_4'] = 22.5 * oslo_utils.units.Ki - if uuid not in mock.keys(): - mock[uuid] = 25 * oslo_utils.units.Ki + return mock[str(uuid)] @staticmethod - def get_average_outlet_temperature(uuid): + def get_average_outlet_temp(resource): """The average outlet temperature for host""" + + uuid = resource.uuid mock = {} mock['Node_0'] = 30 # use a big value to make sure it exceeds threshold @@ -109,7 +120,9 @@ class FakeCeilometerMetrics(object): return float(mock[str(uuid)]) @staticmethod - def get_usage_node_ram(uuid): + def get_usage_compute_node_ram(resource): + + uuid = resource.uuid mock = {} # Ceilometer returns hardware.memory.used samples in KB. mock['Node_0'] = 7 * oslo_utils.units.Ki @@ -125,8 +138,10 @@ class FakeCeilometerMetrics(object): return float(mock[str(uuid)]) @staticmethod - def get_average_airflow(uuid): + def get_average_airflow(resource): """The average outlet temperature for host""" + + uuid = resource.uuid mock = {} mock['Node_0'] = 400 # use a big value to make sure it exceeds threshold @@ -136,8 +151,10 @@ class FakeCeilometerMetrics(object): return mock[str(uuid)] @staticmethod - def get_average_inlet_t(uuid): + def get_average_inlet_temp(resource): """The average outlet temperature for host""" + + uuid = resource.uuid mock = {} mock['Node_0'] = 24 mock['Node_1'] = 26 @@ -146,8 +163,10 @@ class FakeCeilometerMetrics(object): return mock[str(uuid)] @staticmethod - def get_average_power(uuid): + def get_average_power(resource): """The average outlet temperature for host""" + + uuid = resource.uuid mock = {} mock['Node_0'] = 260 mock['Node_1'] = 240 @@ -156,64 +175,48 @@ class FakeCeilometerMetrics(object): return mock[str(uuid)] @staticmethod - def get_usage_node_cpu(*args, **kwargs): + def get_usage_compute_node_cpu(*args, **kwargs): """The last VM CPU usage values to average :param uuid:00 :return: """ - uuid = args[0] - # query influxdb stream - # compute in stream + resource = args[0] + uuid = "%s_%s" % (resource.uuid, resource.hostname) - # Normalize - mock = {} + measurements = {} # node 0 - mock['Node_0_hostname_0'] = 7 - mock['Node_1_hostname_1'] = 7 + measurements['Node_0_hostname_0'] = 7 + measurements['Node_1_hostname_1'] = 7 # node 1 - mock['Node_2_hostname_2'] = 80 + measurements['Node_2_hostname_2'] = 80 # node 2 - mock['Node_3_hostname_3'] = 5 - mock['Node_4_hostname_4'] = 5 - mock['Node_5_hostname_5'] = 10 + measurements['Node_3_hostname_3'] = 5 + measurements['Node_4_hostname_4'] = 5 + measurements['Node_5_hostname_5'] = 10 # node 3 - mock['Node_6_hostname_6'] = 8 + measurements['Node_6_hostname_6'] = 8 # This node doesn't send metrics - mock['LOST_NODE_hostname_7'] = None - mock['Node_19_hostname_19'] = 10 + measurements['LOST_NODE_hostname_7'] = None + measurements['Node_19_hostname_19'] = 10 # node 4 - mock['INSTANCE_7_hostname_7'] = 4 + measurements['INSTANCE_7_hostname_7'] = 4 - mock['Node_0'] = 7 - mock['Node_1'] = 5 - mock['Node_2'] = 10 - mock['Node_3'] = 4 - mock['Node_4'] = 2 - - if uuid not in mock.keys(): - # mock[uuid] = random.randint(1, 4) - mock[uuid] = 8 - - if mock[str(uuid)] is not None: - return float(mock[str(uuid)]) - else: - return mock[str(uuid)] + result = measurements[uuid] + return float(result) if result is not None else None @staticmethod - def get_average_usage_instance_cpu_wb(uuid): + def get_average_usage_instance_cpu_wb(resource): """The last VM CPU usage values to average - :param uuid:00 + :param resource: :return: """ - # query influxdb stream - # compute in stream + uuid = resource.uuid - # Normalize mock = {} # node 0 mock['INSTANCE_1'] = 80 @@ -221,19 +224,20 @@ class FakeCeilometerMetrics(object): # node 1 mock['INSTANCE_3'] = 20 mock['INSTANCE_4'] = 10 + return float(mock[str(uuid)]) @staticmethod - def get_average_usage_instance_memory_wb(uuid): + def get_average_usage_instance_memory_wb(resource): + uuid = resource.uuid + mock = {} # node 0 mock['INSTANCE_1'] = 30 + mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 12 # node 1 mock['INSTANCE_3'] = 12 mock['INSTANCE_4'] = 12 - if uuid not in mock.keys(): - # mock[uuid] = random.randint(1, 4) - mock[uuid] = 12 return mock[str(uuid)] @@ -244,12 +248,10 @@ class FakeCeilometerMetrics(object): :param uuid:00 :return: """ - uuid = args[0] - # query influxdb stream - # compute in stream + resource = args[0] + uuid = resource.uuid - # Normalize mock = {} # node 0 mock['INSTANCE_0'] = 7 @@ -260,22 +262,22 @@ class FakeCeilometerMetrics(object): mock['INSTANCE_3'] = 5 mock['INSTANCE_4'] = 5 mock['INSTANCE_5'] = 10 - # node 3 mock['INSTANCE_6'] = 8 - # node 4 mock['INSTANCE_7'] = 4 - mock['LOST_INSTANCE'] = None + + # metrics might be missing in scenarios which do not do computations if uuid not in mock.keys(): - # mock[uuid] = random.randint(1, 4) - mock[uuid] = 8 + mock[uuid] = 0 return mock[str(uuid)] @staticmethod - def get_average_usage_instance_memory(uuid): + def get_average_usage_instance_memory(resource): + uuid = resource.uuid + mock = {} # node 0 mock['INSTANCE_0'] = 2 @@ -286,20 +288,17 @@ class FakeCeilometerMetrics(object): mock['INSTANCE_3'] = 8 mock['INSTANCE_4'] = 5 mock['INSTANCE_5'] = 16 - # node 3 mock['INSTANCE_6'] = 8 - # node 4 mock['INSTANCE_7'] = 4 - if uuid not in mock.keys(): - # mock[uuid] = random.randint(1, 4) - mock[uuid] = 10 return mock[str(uuid)] @staticmethod - def get_average_usage_instance_disk(uuid): + def get_average_usage_instance_disk(resource): + uuid = resource.uuid + mock = {} # node 0 mock['INSTANCE_0'] = 2 @@ -310,15 +309,9 @@ class FakeCeilometerMetrics(object): mock['INSTANCE_3'] = 10 mock['INSTANCE_4'] = 15 mock['INSTANCE_5'] = 20 - # node 3 mock['INSTANCE_6'] = 8 - # node 4 mock['INSTANCE_7'] = 4 - if uuid not in mock.keys(): - # mock[uuid] = random.randint(1, 4) - mock[uuid] = 4 - return mock[str(uuid)] diff --git a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py index 2ad485ef4..0afa43012 100644 --- a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py +++ b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py @@ -84,23 +84,24 @@ class FakeCeilometerMetrics(object): def __init__(self, model): self.model = model - def mock_get_statistics(self, resource_id=None, meter_name=None, - period=300, granularity=300, dimensions=None, - aggregation='avg', group_by='*'): - if meter_name == "compute.node.cpu.percent": - return self.get_node_cpu_util(resource_id, period, - aggregation, granularity) - elif meter_name == "cpu_util": - return self.get_instance_cpu_util(resource_id, period, - aggregation, granularity) - elif meter_name == "memory.resident": - return self.get_instance_ram_util(resource_id, period, - aggregation, granularity) - elif meter_name == "disk.root.size": - return self.get_instance_disk_root_size(resource_id, period, - aggregation, granularity) + def mock_get_statistics(self, resource=None, resource_type=None, + meter_name=None, period=300, aggregate='mean', + granularity=300): + if meter_name == 'host_cpu_usage': + return self.get_compute_node_cpu_util( + resource, period, aggregate, granularity) + elif meter_name == 'instance_cpu_usage': + return self.get_instance_cpu_util( + resource, period, aggregate, granularity) + elif meter_name == 'instance_ram_usage': + return self.get_instance_ram_util( + resource, period, aggregate, granularity) + elif meter_name == 'instance_root_disk_size': + return self.get_instance_disk_root_size( + resource, period, aggregate, granularity) - def get_node_cpu_util(self, r_id, period, aggregation, granularity): + def get_compute_node_cpu_util(self, resource, period, + aggregate, granularity): """Calculates node utilization dynamicaly. node CPU utilization should consider @@ -109,7 +110,7 @@ class FakeCeilometerMetrics(object): Returns relative node CPU utilization <0, 100>. :param r_id: resource id """ - node_uuid = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1]) + node_uuid = '%s_%s' % (resource.uuid, resource.hostname) node = self.model.get_node_by_uuid(node_uuid) instances = self.model.get_node_instances(node) util_sum = 0.0 @@ -122,7 +123,8 @@ class FakeCeilometerMetrics(object): return util_sum * 100.0 @staticmethod - def get_instance_cpu_util(r_id, period, aggregation, granularity): + def get_instance_cpu_util(resource, period, aggregate, + granularity): instance_cpu_util = dict() instance_cpu_util['INSTANCE_0'] = 10 instance_cpu_util['INSTANCE_1'] = 30 @@ -134,10 +136,11 @@ class FakeCeilometerMetrics(object): instance_cpu_util['INSTANCE_7'] = 100 instance_cpu_util['INSTANCE_8'] = 100 instance_cpu_util['INSTANCE_9'] = 100 - return instance_cpu_util[str(r_id)] + return instance_cpu_util[str(resource.uuid)] @staticmethod - def get_instance_ram_util(r_id, period, aggregation, granularity): + def get_instance_ram_util(resource, period, aggregate, + granularity): instance_ram_util = dict() instance_ram_util['INSTANCE_0'] = 1 instance_ram_util['INSTANCE_1'] = 2 @@ -149,10 +152,11 @@ class FakeCeilometerMetrics(object): instance_ram_util['INSTANCE_7'] = 2 instance_ram_util['INSTANCE_8'] = 4 instance_ram_util['INSTANCE_9'] = 8 - return instance_ram_util[str(r_id)] + return instance_ram_util[str(resource.uuid)] @staticmethod - def get_instance_disk_root_size(r_id, period, aggregation, granularity): + def get_instance_disk_root_size(resource, period, aggregate, + granularity): instance_disk_util = dict() instance_disk_util['INSTANCE_0'] = 10 instance_disk_util['INSTANCE_1'] = 15 @@ -164,30 +168,31 @@ class FakeCeilometerMetrics(object): instance_disk_util['INSTANCE_7'] = 25 instance_disk_util['INSTANCE_8'] = 25 instance_disk_util['INSTANCE_9'] = 25 - return instance_disk_util[str(r_id)] + return instance_disk_util[str(resource.uuid)] class FakeGnocchiMetrics(object): def __init__(self, model): self.model = model - def mock_get_statistics(self, resource_id=None, meter_name=None, - period=300, granularity=300, dimensions=None, - aggregation='avg', group_by='*'): - if meter_name == "compute.node.cpu.percent": - return self.get_node_cpu_util(resource_id, period, - aggregation, granularity) - elif meter_name == "cpu_util": - return self.get_instance_cpu_util(resource_id, period, - aggregation, granularity) - elif meter_name == "memory.resident": - return self.get_instance_ram_util(resource_id, period, - aggregation, granularity) - elif meter_name == "disk.root.size": - return self.get_instance_disk_root_size(resource_id, period, - aggregation, granularity) + def mock_get_statistics(self, resource=None, resource_type=None, + meter_name=None, period=300, aggregate='mean', + granularity=300): + if meter_name == 'host_cpu_usage': + return self.get_compute_node_cpu_util( + resource, period, aggregate, granularity) + elif meter_name == 'instance_cpu_usage': + return self.get_instance_cpu_util( + resource, period, aggregate, granularity) + elif meter_name == 'instance_ram_usage': + return self.get_instance_ram_util( + resource, period, aggregate, granularity) + elif meter_name == 'instance_root_disk_size': + return self.get_instance_disk_root_size( + resource, period, aggregate, granularity) - def get_node_cpu_util(self, r_id, period, aggregation, granularity): + def get_compute_node_cpu_util(self, resource, period, aggregate, + granularity): """Calculates node utilization dynamicaly. node CPU utilization should consider @@ -197,7 +202,7 @@ class FakeGnocchiMetrics(object): :param r_id: resource id """ - node_uuid = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1]) + node_uuid = "%s_%s" % (resource.uuid, resource.hostname) node = self.model.get_node_by_uuid(node_uuid) instances = self.model.get_node_instances(node) util_sum = 0.0 @@ -210,7 +215,8 @@ class FakeGnocchiMetrics(object): return util_sum * 100.0 @staticmethod - def get_instance_cpu_util(r_id, period, aggregation, granularity): + def get_instance_cpu_util(resource, period, aggregate, + granularity): instance_cpu_util = dict() instance_cpu_util['INSTANCE_0'] = 10 instance_cpu_util['INSTANCE_1'] = 30 @@ -222,10 +228,11 @@ class FakeGnocchiMetrics(object): instance_cpu_util['INSTANCE_7'] = 100 instance_cpu_util['INSTANCE_8'] = 100 instance_cpu_util['INSTANCE_9'] = 100 - return instance_cpu_util[str(r_id)] + return instance_cpu_util[str(resource.uuid)] @staticmethod - def get_instance_ram_util(r_id, period, aggregation, granularity): + def get_instance_ram_util(resource, period, aggregate, + granularity): instance_ram_util = dict() instance_ram_util['INSTANCE_0'] = 1 instance_ram_util['INSTANCE_1'] = 2 @@ -237,10 +244,11 @@ class FakeGnocchiMetrics(object): instance_ram_util['INSTANCE_7'] = 2 instance_ram_util['INSTANCE_8'] = 4 instance_ram_util['INSTANCE_9'] = 8 - return instance_ram_util[str(r_id)] + return instance_ram_util[str(resource.uuid)] @staticmethod - def get_instance_disk_root_size(r_id, period, aggregation, granularity): + def get_instance_disk_root_size(resource, period, aggregate, + granularity): instance_disk_util = dict() instance_disk_util['INSTANCE_0'] = 10 instance_disk_util['INSTANCE_1'] = 15 @@ -252,4 +260,4 @@ class FakeGnocchiMetrics(object): instance_disk_util['INSTANCE_7'] = 25 instance_disk_util['INSTANCE_8'] = 25 instance_disk_util['INSTANCE_9'] = 25 - return instance_disk_util[str(r_id)] + return instance_disk_util[str(resource.uuid)] diff --git a/watcher/tests/decision_engine/model/gnocchi_metrics.py b/watcher/tests/decision_engine/model/gnocchi_metrics.py index 817e4636d..a0d3c7ee2 100644 --- a/watcher/tests/decision_engine/model/gnocchi_metrics.py +++ b/watcher/tests/decision_engine/model/gnocchi_metrics.py @@ -23,54 +23,74 @@ class FakeGnocchiMetrics(object): def empty_one_metric(self, emptytype): self.emptytype = emptytype - def mock_get_statistics(self, resource_id=None, meter_name=None, - period=None, granularity=None, dimensions=None, - aggregation='avg', group_by='*'): + def mock_get_statistics(self, resource=None, resource_type=None, + meter_name=None, period=None, aggregate='mean', + granularity=None): result = 0 - if meter_name == "hardware.cpu.util": - result = self.get_usage_node_cpu(resource_id) - elif meter_name == "compute.node.cpu.percent": - result = self.get_usage_node_cpu(resource_id) - elif meter_name == "hardware.memory.used": - result = self.get_usage_node_ram(resource_id) - elif meter_name == "cpu_util": - result = self.get_average_usage_instance_cpu(resource_id) - elif meter_name == "memory.resident": - result = self.get_average_usage_instance_memory(resource_id) - elif meter_name == "hardware.ipmi.node.outlet_temperature": - result = self.get_average_outlet_temperature(resource_id) - elif meter_name == "hardware.ipmi.node.airflow": - result = self.get_average_airflow(resource_id) - elif meter_name == "hardware.ipmi.node.temperature": - result = self.get_average_inlet_t(resource_id) - elif meter_name == "hardware.ipmi.node.power": - result = self.get_average_power(resource_id) + if meter_name == 'host_cpu_usage': + result = self.get_usage_compute_node_cpu(resource) + elif meter_name == 'host_ram_usage': + result = self.get_usage_compute_node_ram(resource) + elif meter_name == 'host_outlet_temp': + result = self.get_average_outlet_temperature(resource) + elif meter_name == 'host_inlet_temp': + result = self.get_average_inlet_temp(resource) + elif meter_name == 'host_airflow': + result = self.get_average_airflow(resource) + elif meter_name == 'host_power': + result = self.get_average_power(resource) + elif meter_name == 'instance_cpu_usage': + result = self.get_average_usage_instance_cpu(resource) + elif meter_name == 'instance_ram_usage': + result = self.get_average_usage_instance_memory(resource) return result - def mock_get_statistics_nn(self, resource_id, period, - aggregation, granularity=300): + def mock_get_statistics_nn(self, resource=None, meter_name=None, + period=None, aggregate='mean', granularity=300): + """Statistics for noisy neighbor strategy + + Signature should match DataSourceBase.get_instance_l3_cache_usage + """ + result = 0.0 if period == 100: - result = self.get_average_l3_cache_current(resource_id) + result = self.get_average_l3_cache_current(resource) if period == 200: - result = self.get_average_l3_cache_previous(resource_id) + result = self.get_average_l3_cache_previous(resource) + return result + + def mock_get_statistics_wb(self, resource=None, resource_type=None, + meter_name=None, period=None, aggregate='mean', + granularity=300): + """Statistics for workload balance strategy""" + + result = 0.0 + if meter_name == 'instance_cpu_usage': + result = self.get_average_usage_instance_cpu_wb(resource) + elif meter_name == 'instance_ram_usage': + result = self.get_average_usage_instance_memory_wb(resource) return result @staticmethod - def get_average_l3_cache_current(uuid): + def get_average_l3_cache_current(resource): """The average l3 cache used by instance""" + + uuid = resource.uuid + mock = {} mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 35 * oslo_utils.units.Ki mock['cae81432-1631-4d4e-b29c-6f3acdcde906'] = 30 * oslo_utils.units.Ki mock['INSTANCE_3'] = 40 * oslo_utils.units.Ki mock['INSTANCE_4'] = 35 * oslo_utils.units.Ki - if uuid not in mock.keys(): - mock[uuid] = 25 * oslo_utils.units.Ki + return mock[str(uuid)] @staticmethod - def get_average_l3_cache_previous(uuid): + def get_average_l3_cache_previous(resource): """The average l3 cache used by instance""" + + uuid = resource.uuid + mock = {} mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 34.5 * ( oslo_utils.units.Ki) @@ -78,33 +98,26 @@ class FakeGnocchiMetrics(object): oslo_utils.units.Ki) mock['INSTANCE_3'] = 60 * oslo_utils.units.Ki mock['INSTANCE_4'] = 22.5 * oslo_utils.units.Ki - if uuid not in mock.keys(): - mock[uuid] = 25 * oslo_utils.units.Ki + return mock[str(uuid)] - def mock_get_statistics_wb(self, resource_id, meter_name, period, - granularity, dimensions=None, - aggregation='avg', group_by='*'): - result = 0.0 - if meter_name == "cpu_util": - result = self.get_average_usage_instance_cpu_wb(resource_id) - elif meter_name == "memory.resident": - result = self.get_average_usage_instance_memory_wb(resource_id) - return result - @staticmethod - def get_average_outlet_temperature(uuid): + def get_average_outlet_temperature(resource): """The average outlet temperature for host""" + + uuid = resource.uuid + mock = {} mock['Node_0'] = 30 # use a big value to make sure it exceeds threshold mock['Node_1'] = 100 - if uuid not in mock.keys(): - mock[uuid] = 100 + return mock[str(uuid)] @staticmethod - def get_usage_node_ram(uuid): + def get_usage_compute_node_ram(resource): + uuid = resource.uuid + mock = {} # Gnocchi returns hardware.memory.used samples in KB. mock['Node_0'] = 7 * oslo_utils.units.Ki @@ -113,83 +126,81 @@ class FakeGnocchiMetrics(object): mock['Node_3'] = 8 * oslo_utils.units.Ki mock['Node_4'] = 4 * oslo_utils.units.Ki - if uuid not in mock.keys(): - mock[uuid] = 8 - return float(mock[str(uuid)]) @staticmethod - def get_average_airflow(uuid): + def get_average_airflow(resource): """The average outlet temperature for host""" + + uuid = resource.uuid + mock = {} mock['Node_0'] = 400 # use a big value to make sure it exceeds threshold mock['Node_1'] = 100 - if uuid not in mock.keys(): - mock[uuid] = 200 + return mock[str(uuid)] @staticmethod - def get_average_inlet_t(uuid): + def get_average_inlet_temp(resource): """The average outlet temperature for host""" + + uuid = resource.uuid + mock = {} mock['Node_0'] = 24 mock['Node_1'] = 26 - if uuid not in mock.keys(): - mock[uuid] = 28 + return mock[str(uuid)] @staticmethod - def get_average_power(uuid): + def get_average_power(resource): """The average outlet temperature for host""" + + uuid = resource.uuid + mock = {} mock['Node_0'] = 260 mock['Node_1'] = 240 - if uuid not in mock.keys(): - mock[uuid] = 200 + return mock[str(uuid)] @staticmethod - def get_usage_node_cpu(*args, **kwargs): + def get_usage_compute_node_cpu(*args, **kwargs): """The last VM CPU usage values to average :param uuid: instance UUID :return: float value """ - uuid = args[0] + + resource = args[0] + uuid = "%s_%s" % (resource.uuid, resource.hostname) + # Normalize - mock = {} + measurements = {} # node 0 - mock['Node_0_hostname_0'] = 7 - mock['Node_1_hostname_1'] = 7 + measurements['Node_0_hostname_0'] = 7 + measurements['Node_1_hostname_1'] = 7 # node 1 - mock['Node_2_hostname_2'] = 80 + measurements['Node_2_hostname_2'] = 80 # node 2 - mock['Node_3_hostname_3'] = 5 - mock['Node_4_hostname_4'] = 5 - mock['Node_5_hostname_5'] = 10 - + measurements['Node_3_hostname_3'] = 5 + measurements['Node_4_hostname_4'] = 5 + measurements['Node_5_hostname_5'] = 10 # node 3 - mock['Node_6_hostname_6'] = 8 + measurements['Node_6_hostname_6'] = 8 # This node doesn't send metrics - mock['LOST_NODE_hostname_7'] = None - mock['Node_19_hostname_19'] = 10 + measurements['LOST_NODE_hostname_7'] = None + measurements['Node_19_hostname_19'] = 10 # node 4 - mock['INSTANCE_7_hostname_7'] = 4 + measurements['INSTANCE_7_hostname_7'] = 4 - mock['Node_0'] = 7 - mock['Node_1'] = 5 - mock['Node_2'] = 10 - mock['Node_3'] = 4 - mock['Node_4'] = 2 + # metrics might be missing in scenarios which do not do computations + if uuid not in measurements.keys(): + measurements[uuid] = 0 - if uuid not in mock.keys(): - mock[uuid] = 8 - - if mock[str(uuid)] is not None: - return float(mock[str(uuid)]) - else: - return mock[str(uuid)] + result = measurements[uuid] + return float(result) if result is not None else None @staticmethod def get_average_usage_instance_cpu(*args, **kwargs): @@ -198,8 +209,10 @@ class FakeGnocchiMetrics(object): :param uuid: instance UUID :return: int value """ - uuid = args[0] - # Normalize + + resource = args[0] + uuid = resource.uuid + mock = {} # node 0 mock['INSTANCE_0'] = 7 @@ -210,22 +223,24 @@ class FakeGnocchiMetrics(object): mock['INSTANCE_3'] = 5 mock['INSTANCE_4'] = 5 mock['INSTANCE_5'] = 10 - # node 3 mock['INSTANCE_6'] = 8 - # node 4 mock['INSTANCE_7'] = 4 mock['LOST_INSTANCE'] = None + + # metrics might be missing in scenarios which do not do computations if uuid not in mock.keys(): - mock[uuid] = 8 + mock[uuid] = 0 return mock[str(uuid)] @staticmethod - def get_average_usage_instance_memory(uuid): + def get_average_usage_instance_memory(resource): + uuid = resource.uuid mock = {} + # node 0 mock['INSTANCE_0'] = 2 mock['INSTANCE_1'] = 5 @@ -235,20 +250,18 @@ class FakeGnocchiMetrics(object): mock['INSTANCE_3'] = 8 mock['INSTANCE_4'] = 5 mock['INSTANCE_5'] = 16 - # node 3 mock['INSTANCE_6'] = 8 - # node 4 mock['INSTANCE_7'] = 4 - if uuid not in mock.keys(): - mock[uuid] = 10 return mock[str(uuid)] @staticmethod - def get_average_usage_instance_disk(uuid): + def get_average_usage_instance_disk(resource): + uuid = resource.uuid mock = {} + # node 0 mock['INSTANCE_0'] = 2 mock['INSTANCE_1'] = 2 @@ -258,49 +271,42 @@ class FakeGnocchiMetrics(object): mock['INSTANCE_3'] = 10 mock['INSTANCE_4'] = 15 mock['INSTANCE_5'] = 20 - # node 3 mock['INSTANCE_6'] = 8 - # node 4 mock['INSTANCE_7'] = 4 - if uuid not in mock.keys(): - mock[uuid] = 4 - return mock[str(uuid)] @staticmethod - def get_average_usage_instance_cpu_wb(uuid): + def get_average_usage_instance_cpu_wb(resource): """The last VM CPU usage values to average :param uuid: instance UUID :return: float value """ - # query influxdb stream - - # compute in stream - - # Normalize + uuid = resource.uuid mock = {} + # node 0 mock['INSTANCE_1'] = 80 mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 50 # node 1 mock['INSTANCE_3'] = 20 mock['INSTANCE_4'] = 10 + return float(mock[str(uuid)]) @staticmethod - def get_average_usage_instance_memory_wb(uuid): + def get_average_usage_instance_memory_wb(resource): + uuid = resource.uuid mock = {} + # node 0 mock['INSTANCE_1'] = 30 + mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 12 # node 1 mock['INSTANCE_3'] = 12 mock['INSTANCE_4'] = 12 - if uuid not in mock.keys(): - # mock[uuid] = random.randint(1, 4) - mock[uuid] = 12 return mock[str(uuid)] diff --git a/watcher/tests/decision_engine/model/monasca_metrics.py b/watcher/tests/decision_engine/model/monasca_metrics.py index 9d2f63702..b2362eed6 100644 --- a/watcher/tests/decision_engine/model/monasca_metrics.py +++ b/watcher/tests/decision_engine/model/monasca_metrics.py @@ -16,8 +16,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import oslo_utils - class FakeMonascaMetrics(object): def __init__(self): @@ -26,117 +24,37 @@ class FakeMonascaMetrics(object): def empty_one_metric(self, emptytype): self.emptytype = emptytype - def mock_get_statistics(self, resource_id=None, meter_name=None, - period=None, granularity=None, dimensions=None, - aggregation='avg', group_by='*'): - resource_id = dimensions.get( - "resource_id") or dimensions.get("hostname") + def mock_get_statistics(self, resource=None, resource_type=None, + meter_name=None, period=None, aggregate='mean', + granularity=None): result = 0.0 - if meter_name == "cpu.percent": - result = self.get_usage_node_cpu(resource_id) - elif meter_name == "vm.cpu.utilization_perc": - result = self.get_average_usage_instance_cpu(resource_id) - # elif meter_name == "hardware.memory.used": - # result = self.get_usage_node_ram(resource_id) - # elif meter_name == "memory.resident": - # result = self.get_average_usage_instance_memory(resource_id) - # elif meter_name == "hardware.ipmi.node.outlet_temperature": - # result = self.get_average_outlet_temperature(resource_id) - # elif meter_name == "hardware.ipmi.node.airflow": - # result = self.get_average_airflow(resource_id) - # elif meter_name == "hardware.ipmi.node.temperature": - # result = self.get_average_inlet_t(resource_id) - # elif meter_name == "hardware.ipmi.node.power": - # result = self.get_average_power(resource_id) + if meter_name == 'host_cpu_usage': + result = self.get_usage_compute_node_cpu(resource) + elif meter_name == 'instance_cpu_usage': + result = self.get_average_usage_instance_cpu(resource) return result - def mock_get_statistics_wb(self, meter_name, dimensions, period, - aggregate='avg'): - resource_id = dimensions.get( - "resource_id") or dimensions.get("hostname") + def mock_get_statistics_wb(self, resource=None, resource_type=None, + meter_name=None, period=None, aggregate='mean', + granularity=None): + """Statistics for workload balance strategy""" + result = 0.0 - if meter_name == "vm.cpu.utilization_perc": - result = self.get_average_usage_instance_cpu_wb(resource_id) + if meter_name == 'instance_cpu_usage': + result = self.get_average_usage_instance_cpu_wb(resource) return result @staticmethod - def get_average_outlet_temperature(uuid): - """The average outlet temperature for host""" - measurements = {} - measurements['Node_0'] = 30 - # use a big value to make sure it exceeds threshold - measurements['Node_1'] = 100 - if uuid not in measurements.keys(): - measurements[uuid] = 100 - return [{'columns': ['avg'], - 'statistics': [[float(measurements[str(uuid)])]]}] - - @staticmethod - def get_usage_node_ram(uuid): - measurements = {} - # Monasca returns hardware.memory.used samples in KB. - measurements['Node_0'] = 7 * oslo_utils.units.Ki - measurements['Node_1'] = 5 * oslo_utils.units.Ki - measurements['Node_2'] = 29 * oslo_utils.units.Ki - measurements['Node_3'] = 8 * oslo_utils.units.Ki - measurements['Node_4'] = 4 * oslo_utils.units.Ki - - if uuid not in measurements.keys(): - # measurements[uuid] = random.randint(1, 4) - measurements[uuid] = 8 - - return float(measurements[str(uuid)]) - - @staticmethod - def get_average_airflow(uuid): - """The average outlet temperature for host""" - measurements = {} - measurements['Node_0'] = 400 - # use a big value to make sure it exceeds threshold - measurements['Node_1'] = 100 - if uuid not in measurements.keys(): - measurements[uuid] = 200 - return [{'columns': ['avg'], - 'statistics': [[float(measurements[str(uuid)])]]}] - - @staticmethod - def get_average_inlet_t(uuid): - """The average outlet temperature for host""" - measurements = {} - measurements['Node_0'] = 24 - measurements['Node_1'] = 26 - if uuid not in measurements.keys(): - measurements[uuid] = 28 - return [{'columns': ['avg'], - 'statistics': [[float(measurements[str(uuid)])]]}] - - @staticmethod - def get_average_power(uuid): - """The average outlet temperature for host""" - measurements = {} - measurements['Node_0'] = 260 - measurements['Node_1'] = 240 - if uuid not in measurements.keys(): - measurements[uuid] = 200 - return [{'columns': ['avg'], - 'statistics': [[float(measurements[str(uuid)])]]}] - - @staticmethod - def get_usage_node_cpu(*args, **kwargs): - uuid = args[0] - if type(uuid) is dict: - uuid = uuid.get("resource_id") or uuid.get("hostname") - uuid = uuid.rsplit('_', 2)[0] + def get_usage_compute_node_cpu(*args, **kwargs): """The last VM CPU usage values to average :param uuid:00 :return: """ - # query influxdb stream - # compute in stream + resource = args[0] + uuid = resource.uuid - # Normalize measurements = {} # node 0 measurements['Node_0'] = 7 @@ -147,7 +65,6 @@ class FakeMonascaMetrics(object): measurements['Node_3'] = 5 measurements['Node_4'] = 5 measurements['Node_5'] = 10 - # node 3 measurements['Node_6'] = 8 measurements['Node_19'] = 10 @@ -168,45 +85,18 @@ class FakeMonascaMetrics(object): value = float(sum(values)) / len(values) cpu_usage = value return cpu_usage - # return float(measurements[str(uuid)]) - - @staticmethod - def get_average_usage_instance_cpu_wb(uuid): - """The last VM CPU usage values to average - - :param uuid:00 - :return: - """ - # query influxdb stream - - # compute in stream - - # Normalize - measurements = {} - # node 0 - measurements['INSTANCE_1'] = 80 - measurements['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 50 - # node 1 - measurements['INSTANCE_3'] = 20 - measurements['INSTANCE_4'] = 10 - return [{'columns': ['avg'], - 'statistics': [[float(measurements[str(uuid)])]]}] @staticmethod def get_average_usage_instance_cpu(*args, **kwargs): - uuid = args[0] - if type(uuid) is dict: - uuid = uuid.get("resource_id") or uuid.get("hostname") """The last VM CPU usage values to average :param uuid:00 :return: """ - # query influxdb stream - # compute in stream + resource = args[0] + uuid = resource.uuid - # Normalize measurements = {} # node 0 measurements['INSTANCE_0'] = 7 @@ -217,12 +107,11 @@ class FakeMonascaMetrics(object): measurements['INSTANCE_3'] = 5 measurements['INSTANCE_4'] = 5 measurements['INSTANCE_5'] = 10 - # node 3 measurements['INSTANCE_6'] = 8 - # node 4 measurements['INSTANCE_7'] = 4 + if uuid not in measurements.keys(): # measurements[uuid] = random.randint(1, 4) measurements[uuid] = 8 @@ -237,54 +126,3 @@ class FakeMonascaMetrics(object): value = float(sum(values)) / len(values) cpu_usage = value return cpu_usage - - @staticmethod - def get_average_usage_instance_memory(uuid): - measurements = {} - # node 0 - measurements['INSTANCE_0'] = 2 - measurements['INSTANCE_1'] = 5 - # node 1 - measurements['INSTANCE_2'] = 5 - # node 2 - measurements['INSTANCE_3'] = 8 - measurements['INSTANCE_4'] = 5 - measurements['INSTANCE_5'] = 16 - - # node 3 - measurements['INSTANCE_6'] = 8 - - # node 4 - measurements['INSTANCE_7'] = 4 - if uuid not in measurements.keys(): - # measurements[uuid] = random.randint(1, 4) - measurements[uuid] = 10 - - return [{'columns': ['avg'], - 'statistics': [[float(measurements[str(uuid)])]]}] - - @staticmethod - def get_average_usage_instance_disk(uuid): - measurements = {} - # node 0 - measurements['INSTANCE_0'] = 2 - measurements['INSTANCE_1'] = 2 - # node 1 - measurements['INSTANCE_2'] = 2 - # node 2 - measurements['INSTANCE_3'] = 10 - measurements['INSTANCE_4'] = 15 - measurements['INSTANCE_5'] = 20 - - # node 3 - measurements['INSTANCE_6'] = 8 - - # node 4 - measurements['INSTANCE_7'] = 4 - - if uuid not in measurements.keys(): - # measurements[uuid] = random.randint(1, 4) - measurements[uuid] = 4 - - return [{'columns': ['avg'], - 'statistics': [[float(measurements[str(uuid)])]]}] diff --git a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py index f4332022d..0ce306966 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py @@ -62,7 +62,7 @@ class TestBasicConsolidation(TestBaseStrategy): self.addCleanup(p_datasource.stop) self.m_datasource.return_value = mock.Mock( - get_host_cpu_usage=self.fake_metrics.get_usage_node_cpu, + get_host_cpu_usage=self.fake_metrics.get_usage_compute_node_cpu, get_instance_cpu_usage=self.fake_metrics. get_average_usage_instance_cpu ) @@ -75,7 +75,7 @@ class TestBasicConsolidation(TestBaseStrategy): size_cluster_assert = 5 self.assertEqual(size_cluster_assert, size_cluster) - def test_basic_consolidation_score_node(self): + def test_basic_consolidation_score_comute_node(self): model = self.fake_c_cluster.generate_scenario_1() self.m_c_model.return_value = model node_1_score = 0.023333333333333317 @@ -96,7 +96,6 @@ class TestBasicConsolidation(TestBaseStrategy): self.assertEqual( instance_0_score, self.strategy.calculate_score_instance(instance_0)) - instance_1 = model.get_instance_by_uuid("INSTANCE_1") instance_1_score = 0.023333333333333317 self.assertEqual( @@ -236,69 +235,3 @@ class TestBasicConsolidation(TestBaseStrategy): loaded_action = loader.load(action['action_type']) loaded_action.input_parameters = action['input_parameters'] loaded_action.validate_parameters() - - """def test_periods(self): - model = self.fake_c_cluster.generate_scenario_1() - self.m_c_model.return_value = model - node_1 = model.get_node_by_uuid("Node_1") - p_ceilometer = mock.patch.object( - strategies.BasicConsolidation, "ceilometer") - m_ceilometer = p_ceilometer.start() - self.addCleanup(p_ceilometer.stop) - p_monasca = mock.patch.object(strategies.BasicConsolidation, "monasca") - m_monasca = p_monasca.start() - self.addCleanup(p_monasca.stop) - p_gnocchi = mock.patch.object(strategies.BasicConsolidation, "gnocchi") - m_gnocchi = p_gnocchi.start() - self.addCleanup(p_gnocchi.stop) - datetime_patcher = mock.patch.object( - datetime, 'datetime', - mock.Mock(wraps=datetime.datetime) - ) - mocked_datetime = datetime_patcher.start() - mocked_datetime.utcnow.return_value = datetime.datetime( - 2017, 3, 19, 18, 53, 11, 657417) - self.addCleanup(datetime_patcher.stop) - m_monasca.return_value = mock.Mock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - m_ceilometer.return_value = mock.Mock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - m_gnocchi.return_value = mock.Mock( - statistic_aggregation=self.fake_metrics.mock_get_statistics) - self.strategy.calculate_score_node(node_1) - resource_id = "%s_%s" % (node_1.uuid, node_1.hostname) - if self.strategy.config.datasource == "ceilometer": - m_ceilometer.statistic_aggregation.assert_called_with( - aggregate='avg', meter_name='compute.node.cpu.percent', - period=7200, resource_id=resource_id) - elif self.strategy.config.datasource == "monasca": - m_monasca.statistic_aggregation.assert_called_with( - aggregate='avg', meter_name='cpu.percent', - period=7200, dimensions={'hostname': 'Node_1'}) - elif self.strategy.config.datasource == "gnocchi": - stop_time = datetime.datetime.utcnow() - start_time = stop_time - datetime.timedelta( - seconds=int('7200')) - m_gnocchi.statistic_aggregation.assert_called_with( - resource_id=resource_id, metric='compute.node.cpu.percent', - granularity=300, start_time=start_time, stop_time=stop_time, - aggregation='mean') - - self.strategy.input_parameters.update({"period": 600}) - self.strategy.calculate_score_node(node_1) - if self.strategy.config.datasource == "ceilometer": - m_ceilometer.statistic_aggregation.assert_called_with( - aggregate='avg', meter_name='compute.node.cpu.percent', - period=600, resource_id=resource_id) - elif self.strategy.config.datasource == "monasca": - m_monasca.statistic_aggregation.assert_called_with( - aggregate='avg', meter_name='cpu.percent', - period=600, dimensions={'hostname': 'Node_1'}) - elif self.strategy.config.datasource == "gnocchi": - stop_time = datetime.datetime.utcnow() - start_time = stop_time - datetime.timedelta( - seconds=int('600')) - m_gnocchi.statistic_aggregation.assert_called_with( - resource_id=resource_id, metric='compute.node.cpu.percent', - granularity=300, start_time=start_time, stop_time=stop_time, - aggregation='mean')""" diff --git a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py index a5575b9ee..c41824b53 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py @@ -73,8 +73,8 @@ class TestOutletTempControl(TestBaseStrategy): model = self.fake_c_cluster.generate_scenario_3_with_2_nodes() self.m_c_model.return_value = model n1, n2 = self.strategy.group_hosts_by_outlet_temp() - self.assertEqual('Node_1', n1[0]['node'].uuid) - self.assertEqual('Node_0', n2[0]['node'].uuid) + self.assertEqual('Node_1', n1[0]['compute_node'].uuid) + self.assertEqual('Node_0', n2[0]['compute_node'].uuid) def test_choose_instance_to_migrate(self): model = self.fake_c_cluster.generate_scenario_3_with_2_nodes() @@ -92,7 +92,7 @@ class TestOutletTempControl(TestBaseStrategy): instance_to_mig = self.strategy.choose_instance_to_migrate(n1) dest_hosts = self.strategy.filter_dest_servers(n2, instance_to_mig[1]) self.assertEqual(1, len(dest_hosts)) - self.assertEqual('Node_0', dest_hosts[0]['node'].uuid) + self.assertEqual('Node_0', dest_hosts[0]['compute_node'].uuid) def test_execute_no_workload(self): model = self.fake_c_cluster.\ diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py index a4a12cc8c..70fdb19e8 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -55,13 +55,13 @@ class TestWorkloadBalance(TestBaseStrategy): self.strategy = strategies.WorkloadBalance( config=mock.Mock(datasource=self.datasource)) self.strategy.input_parameters = utils.Struct() - self.strategy.input_parameters.update({'metrics': 'cpu_util', + self.strategy.input_parameters.update({'metrics': 'instance_cpu_usage', 'threshold': 25.0, 'period': 300, 'granularity': 300}) self.strategy.threshold = 25.0 self.strategy._period = 300 - self.strategy._meter = "cpu_util" + self.strategy._meter = 'instance_cpu_usage' self.strategy._granularity = 300 def test_calc_used_resource(self): @@ -78,18 +78,18 @@ class TestWorkloadBalance(TestBaseStrategy): self.m_c_model.return_value = model self.strategy.threshold = 30 n1, n2, avg, w_map = self.strategy.group_hosts_by_cpu_or_ram_util() - self.assertEqual(n1[0]['node'].uuid, 'Node_0') - self.assertEqual(n2[0]['node'].uuid, 'Node_1') + self.assertEqual(n1[0]['compute_node'].uuid, 'Node_0') + self.assertEqual(n2[0]['compute_node'].uuid, 'Node_1') self.assertEqual(avg, 8.0) def test_group_hosts_by_ram_util(self): model = self.fake_c_cluster.generate_scenario_6_with_2_nodes() self.m_c_model.return_value = model - self.strategy._meter = "memory.resident" + self.strategy._meter = 'instance_ram_usage' self.strategy.threshold = 30 n1, n2, avg, w_map = self.strategy.group_hosts_by_cpu_or_ram_util() - self.assertEqual(n1[0]['node'].uuid, 'Node_0') - self.assertEqual(n2[0]['node'].uuid, 'Node_1') + self.assertEqual(n1[0]['compute_node'].uuid, 'Node_0') + self.assertEqual(n2[0]['compute_node'].uuid, 'Node_1') self.assertEqual(avg, 33.0) def test_choose_instance_to_migrate(self): @@ -123,7 +123,7 @@ class TestWorkloadBalance(TestBaseStrategy): dest_hosts = self.strategy.filter_destination_hosts( n2, instance_to_mig[1], avg, w_map) self.assertEqual(len(dest_hosts), 1) - self.assertEqual(dest_hosts[0]['node'].uuid, 'Node_1') + self.assertEqual(dest_hosts[0]['compute_node'].uuid, 'Node_1') def test_execute_no_workload(self): model = self.fake_c_cluster.\ diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py index 9ee7484d8..c576629fe 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py @@ -46,11 +46,16 @@ class TestWorkloadStabilization(TestBaseStrategy): self.fake_metrics = self.fake_datasource_cls() self.hosts_load_assert = { - 'Node_0': {'cpu_util': 0.07, 'memory.resident': 7.0, 'vcpus': 40}, - 'Node_1': {'cpu_util': 0.07, 'memory.resident': 5, 'vcpus': 40}, - 'Node_2': {'cpu_util': 0.8, 'memory.resident': 29, 'vcpus': 40}, - 'Node_3': {'cpu_util': 0.05, 'memory.resident': 8, 'vcpus': 40}, - 'Node_4': {'cpu_util': 0.05, 'memory.resident': 4, 'vcpus': 40}} + 'Node_0': {'instance_cpu_usage': 0.07, + 'instance_ram_usage': 7.0, 'vcpus': 40}, + 'Node_1': {'instance_cpu_usage': 0.07, + 'instance_ram_usage': 5, 'vcpus': 40}, + 'Node_2': {'instance_cpu_usage': 0.8, + 'instance_ram_usage': 29, 'vcpus': 40}, + 'Node_3': {'instance_cpu_usage': 0.05, + 'instance_ram_usage': 8, 'vcpus': 40}, + 'Node_4': {'instance_cpu_usage': 0.05, + 'instance_ram_usage': 4, 'vcpus': 40}} p_osc = mock.patch.object( clients, "OpenStackClients") @@ -70,28 +75,32 @@ class TestWorkloadStabilization(TestBaseStrategy): config=mock.Mock(datasource=self.datasource)) self.strategy.input_parameters = utils.Struct() self.strategy.input_parameters.update( - {'metrics': ["cpu_util", "memory.resident"], - 'thresholds': {"cpu_util": 0.2, "memory.resident": 0.2}, - 'weights': {"cpu_util_weight": 1.0, - "memory.resident_weight": 1.0}, + {'metrics': ["instance_cpu_usage", "instance_ram_usage"], + 'thresholds': {"instance_cpu_usage": 0.2, + "instance_ram_usage": 0.2}, + 'weights': {"instance_cpu_usage_weight": 1.0, + "instance_ram_usage_weight": 1.0}, 'instance_metrics': - {"cpu_util": "compute.node.cpu.percent", - "memory.resident": "hardware.memory.used"}, + {"instance_cpu_usage": "host_cpu_usage", + "instance_ram_usage": "host_ram_usage"}, 'host_choice': 'retry', 'retry_count': 1, - 'periods': {"instance": 720, "node": 600}, - 'aggregation_method': {"instance": "mean", "node": "mean"}}) - self.strategy.metrics = ["cpu_util", "memory.resident"] - self.strategy.thresholds = {"cpu_util": 0.2, "memory.resident": 0.2} - self.strategy.weights = {"cpu_util_weight": 1.0, - "memory.resident_weight": 1.0} + 'periods': {"instance": 720, "compute_node": 600}, + 'aggregation_method': {"instance": "mean", + "compute_node": "mean"}}) + self.strategy.metrics = ["instance_cpu_usage", "instance_ram_usage"] + self.strategy.thresholds = {"instance_cpu_usage": 0.2, + "instance_ram_usage": 0.2} + self.strategy.weights = {"instance_cpu_usage_weight": 1.0, + "instance_ram_usage_weight": 1.0} self.strategy.instance_metrics = { - "cpu_util": "compute.node.cpu.percent", - "memory.resident": "hardware.memory.used"} + "instance_cpu_usage": "host_cpu_usage", + "instance_ram_usage": "host_ram_usage"} self.strategy.host_choice = 'retry' self.strategy.retry_count = 1 - self.strategy.periods = {"instance": 720, "node": 600} - self.strategy.aggregation_method = {"instance": "mean", "node": "mean"} + self.strategy.periods = {"instance": 720, "compute_node": 600} + self.strategy.aggregation_method = {"instance": "mean", + "compute_node": "mean"} def test_get_instance_load(self): model = self.fake_c_cluster.generate_scenario_1() @@ -99,7 +108,7 @@ class TestWorkloadStabilization(TestBaseStrategy): instance0 = model.get_instance_by_uuid("INSTANCE_0") instance_0_dict = { 'uuid': 'INSTANCE_0', 'vcpus': 10, - 'cpu_util': 0.07, 'memory.resident': 2} + 'instance_cpu_usage': 0.07, 'instance_ram_usage': 2} self.assertEqual( instance_0_dict, self.strategy.get_instance_load(instance0)) @@ -112,14 +121,16 @@ class TestWorkloadStabilization(TestBaseStrategy): def test_normalize_hosts_load(self): self.m_c_model.return_value = self.fake_c_cluster.generate_scenario_1() - fake_hosts = {'Node_0': {'cpu_util': 0.07, 'memory.resident': 7}, - 'Node_1': {'cpu_util': 0.05, 'memory.resident': 5}} + fake_hosts = {'Node_0': {'instance_cpu_usage': 0.07, + 'instance_ram_usage': 7}, + 'Node_1': {'instance_cpu_usage': 0.05, + 'instance_ram_usage': 5}} normalized_hosts = {'Node_0': - {'cpu_util': 0.07, - 'memory.resident': 0.05303030303030303}, + {'instance_cpu_usage': 0.07, + 'instance_ram_usage': 0.05303030303030303}, 'Node_1': - {'cpu_util': 0.05, - 'memory.resident': 0.03787878787878788}} + {'instance_cpu_usage': 0.05, + 'instance_ram_usage': 0.03787878787878788}} self.assertEqual( normalized_hosts, self.strategy.normalize_hosts_load(fake_hosts)) @@ -147,11 +158,11 @@ class TestWorkloadStabilization(TestBaseStrategy): test_ram_sd = 9.3 self.assertEqual( round(self.strategy.get_sd( - self.hosts_load_assert, 'cpu_util'), 3), + self.hosts_load_assert, 'instance_cpu_usage'), 3), test_cpu_sd) self.assertEqual( round(self.strategy.get_sd( - self.hosts_load_assert, 'memory.resident'), 1), + self.hosts_load_assert, 'instance_ram_usage'), 1), test_ram_sd) def test_calculate_weighted_sd(self): @@ -167,8 +178,9 @@ class TestWorkloadStabilization(TestBaseStrategy): result = self.strategy.calculate_migration_case( self.hosts_load_assert, instance, src_node, dst_node)[-1][dst_node.uuid] - result['cpu_util'] = round(result['cpu_util'], 3) - self.assertEqual(result, {'cpu_util': 0.095, 'memory.resident': 21.0, + result['instance_cpu_usage'] = round(result['instance_cpu_usage'], 3) + self.assertEqual(result, {'instance_cpu_usage': 0.095, + 'instance_ram_usage': 21.0, 'vcpus': 40}) def test_simulate_migrations(self): @@ -191,13 +203,15 @@ class TestWorkloadStabilization(TestBaseStrategy): def test_check_threshold(self): self.m_c_model.return_value = self.fake_c_cluster.generate_scenario_1() - self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} + self.strategy.thresholds = {'instance_cpu_usage': 0.001, + 'instance_ram_usage': 0.2} self.strategy.simulate_migrations = mock.Mock(return_value=True) self.assertTrue(self.strategy.check_threshold()) def test_execute_one_migration(self): self.m_c_model.return_value = self.fake_c_cluster.generate_scenario_1() - self.strategy.thresholds = {'cpu_util': 0.001, 'memory.resident': 0.2} + self.strategy.thresholds = {'instance_cpu_usage': 0.001, + 'instance_ram_usage': 0.2} self.strategy.simulate_migrations = mock.Mock( return_value=[ {'instance': 'INSTANCE_4', 's_host': 'Node_2', @@ -210,8 +224,8 @@ class TestWorkloadStabilization(TestBaseStrategy): def test_execute_multiply_migrations(self): self.m_c_model.return_value = self.fake_c_cluster.generate_scenario_1() - self.strategy.thresholds = {'cpu_util': 0.00001, - 'memory.resident': 0.0001} + self.strategy.thresholds = {'instance_cpu_usage': 0.00001, + 'instance_ram_usage': 0.0001} self.strategy.simulate_migrations = mock.Mock( return_value=[ {'instance': 'INSTANCE_4', 's_host': 'Node_2', @@ -225,8 +239,8 @@ class TestWorkloadStabilization(TestBaseStrategy): def test_execute_nothing_to_migrate(self): self.m_c_model.return_value = self.fake_c_cluster.generate_scenario_1() - self.strategy.thresholds = {'cpu_util': 0.042, - 'memory.resident': 0.0001} + self.strategy.thresholds = {'instance_cpu_usage': 0.042, + 'instance_ram_usage': 0.0001} self.strategy.simulate_migrations = mock.Mock(return_value=False) self.strategy.instance_migrations_count = 0 with mock.patch.object(self.strategy, 'migrate') as mock_migrate: