Grafana proxy datasource to retrieve metrics

New datasource to retrieve metrics that can be configured in a
flexible way depending on the deployment. Current implemenation only
works with InfluxDB. Slight changes to datasource manager were
necessary because grafana metric_map can only be built at runtime.

The yaml configuration file can still be used to define metrics
but will require that five different attributes are specified per
metric.

Specific databases accesible through grafana can be defined by
creating 'translators' for these specific databases. This patch
introduces a base class for these translators and their methods.

In addition the first translator specific for InfluxDB is
created.

Depends-on: I68475883529610e514aa82f1881105ab0cf24ec3
Depends-on: If1f27dc01e853c5b24bdb21f1e810f64eaee2e5c
Implements: blueprint grafana-proxy-datasource
Change-Id: Ib12b6a7882703e84a27c301e821c1a034b192508
This commit is contained in:
Dantali0n
2019-04-02 16:20:02 +02:00
parent f8fef7d774
commit 0541d8c25b
10 changed files with 1121 additions and 5 deletions

View File

@@ -0,0 +1,251 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log
import six.moves.urllib.parse as urlparse
from watcher.common import clients
from watcher.common import exception
from watcher.datasources import base
from watcher.datasources.grafana_translator import influxdb
import requests
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class GrafanaHelper(base.DataSourceBase):
NAME = 'grafana'
"""METRIC_MAP is only available at runtime _build_metric_map"""
METRIC_MAP = dict()
"""All available translators"""
TRANSLATOR_LIST = [
influxdb.InfluxDBGrafanaTranslator.NAME
]
def __init__(self, osc=None):
""":param osc: an OpenStackClients instance"""
self.osc = osc if osc else clients.OpenStackClients()
self.nova = self.osc.nova()
self.configured = False
self._base_url = None
self._headers = None
self._setup()
def _setup(self):
"""Configure grafana helper to perform requests"""
token = CONF.grafana_client.token
base_url = CONF.grafana_client.base_url
if not token:
LOG.critical("GrafanaHelper authentication token not configured")
return
self._headers = {"Authorization": "Bearer " + token,
"Content-Type": "Application/json"}
if not base_url:
LOG.critical("GrafanaHelper url not properly configured, "
"check base_url")
return
self._base_url = base_url
# Very basic url parsing
parse = urlparse.urlparse(self._base_url)
if parse.scheme is '' or parse.netloc is '' or parse.path is '':
LOG.critical("GrafanaHelper url not properly configured, "
"check base_url and project_id")
return
self._build_metric_map()
if len(self.METRIC_MAP) == 0:
LOG.critical("GrafanaHelper not configured for any metrics")
self.configured = True
def _build_metric_map(self):
"""Builds the metric map by reading config information"""
for key, value in CONF.grafana_client.database_map.items():
try:
project = CONF.grafana_client.project_id_map[key]
attribute = CONF.grafana_client.attribute_map[key]
translator = CONF.grafana_client.translator_map[key]
query = CONF.grafana_client.query_map[key]
if project is not None and \
value is not None and\
translator in self.TRANSLATOR_LIST and\
query is not None:
self.METRIC_MAP[key] = {
'db': value,
'project': project,
'attribute': attribute,
'translator': translator,
'query': query
}
except KeyError as e:
LOG.error(e)
def _build_translator_schema(self, metric, db, attribute, query, resource,
resource_type, period, aggregate,
granularity):
"""Create dictionary to pass to grafana proxy translators"""
return {'metric': metric, 'db': db, 'attribute': attribute,
'query': query, 'resource': resource,
'resource_type': resource_type, 'period': period,
'aggregate': aggregate, 'granularity': granularity}
def _get_translator(self, name, data):
"""Use the names of translators to get the translator for the metric"""
if name == influxdb.InfluxDBGrafanaTranslator.NAME:
return influxdb.InfluxDBGrafanaTranslator(data)
else:
raise exception.InvalidParameter(
parameter='name', parameter_type='grafana translator')
def _request(self, params, project_id):
"""Make the request to the endpoint to retrieve data
If the request fails, determines what error to raise.
"""
if self.configured is False:
raise exception.DataSourceNotAvailable(self.NAME)
resp = requests.get(self._base_url + str(project_id) + '/query',
params=params, headers=self._headers)
if resp.status_code == 200:
return resp
elif resp.status_code == 400:
LOG.error("Query for metric is invalid")
elif resp.status_code == 401:
LOG.error("Authorization token is invalid")
raise exception.DataSourceNotAvailable(self.NAME)
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
"""Get the value for the specific metric based on specified parameters
"""
try:
self.METRIC_MAP[meter_name]
except KeyError:
LOG.error("Metric: {0} does not appear in the current Grafana "
"metric map".format(meter_name))
raise exception.MetricNotAvailable(metric=meter_name)
db = self.METRIC_MAP[meter_name]['db']
project = self.METRIC_MAP[meter_name]['project']
attribute = self.METRIC_MAP[meter_name]['attribute']
translator_name = self.METRIC_MAP[meter_name]['translator']
query = self.METRIC_MAP[meter_name]['query']
data = self._build_translator_schema(
meter_name, db, attribute, query, resource, resource_type, period,
aggregate, granularity)
translator = self._get_translator(translator_name, data)
params = translator.build_params()
raw_kwargs = dict(
params=params,
project_id=project,
)
kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
resp = self.query_retry(self._request, **kwargs)
result = translator.extract_result(resp.content)
return result
def get_host_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_cpu_usage', period, aggregate,
granularity)
def get_host_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_ram_usage', period, aggregate,
granularity)
def get_host_outlet_temp(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_outlet_temp', period, aggregate,
granularity)
def get_host_inlet_temp(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_inlet_temp', period, aggregate,
granularity)
def get_host_airflow(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_airflow', period, aggregate,
granularity)
def get_host_power(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_power', period, aggregate,
granularity)
def get_instance_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_cpu_usage', period, aggregate,
granularity)
def get_instance_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_usage', period, aggregate,
granularity)
def get_instance_ram_allocated(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_allocated', period, aggregate,
granularity)
def get_instance_l3_cache_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_l3_cache_usage', period, aggregate,
granularity)
def get_instance_root_disk_size(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_root_disk_size', period, aggregate,
granularity)

View File

@@ -0,0 +1,125 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from watcher._i18n import _
from watcher.common import exception
from watcher.datasources import base
class BaseGrafanaTranslator(object):
"""Grafana translator baseclass to use with grafana for different databases
Specific databasses that are proxied through grafana require some
alterations depending on the database.
"""
"""
data {
metric: name of the metric as found in DataSourceBase.METRIC_MAP,
db: database specified for this metric in grafana_client config
options,
attribute: the piece of information that will be selected from the
resource object to build the query.
query: the unformatted query from the configuration for this metric,
resource: the object from the OpenStackClient
resource_type: the type of the resource
['compute_node','instance', 'bare_metal', 'storage'],
period: the period of time to collect metrics for in seconds,
aggregate: the aggregation can be any from ['mean', 'max', 'min',
'count'],
granularity: interval between datapoints in seconds (optional),
}
"""
"""Every grafana translator should have a uniquely identifying name"""
NAME = ''
RESOURCE_TYPES = base.DataSourceBase.RESOURCE_TYPES
AGGREGATES = base.DataSourceBase.AGGREGATES
def __init__(self, data):
self._data = data
self._validate_data()
def _validate_data(self):
"""iterate through the supplied data and verify attributes"""
optionals = ['granularity']
reference_data = {
'metric': None,
'db': None,
'attribute': None,
'query': None,
'resource': None,
'resource_type': None,
'period': None,
'aggregate': None,
'granularity': None
}
reference_data.update(self._data)
for key, value in reference_data.items():
if value is None and key not in optionals:
raise exception.InvalidParameter(
message=(_("The value %(value)s for parameter "
"%(parameter)s is invalid") % {'value': None,
'parameter': key}
)
)
if reference_data['resource_type'] not in self.RESOURCE_TYPES:
raise exception.InvalidParameter(parameter='resource_type',
parameter_type='RESOURCE_TYPES')
if reference_data['aggregate'] not in self.AGGREGATES:
raise exception.InvalidParameter(parameter='aggregate',
parameter_type='AGGREGATES')
@staticmethod
def _extract_attribute(resource, attribute):
"""Retrieve the desired attribute from the resource
:param resource: The resource object to extract the attribute from.
:param attribute: The name of the attribute to subtract as string.
:return: The extracted attribute or None
"""
try:
return getattr(resource, attribute)
except AttributeError:
raise
@staticmethod
def _query_format(query, aggregate, resource, period,
granularity, translator_specific):
return query.format(aggregate, resource, period, granularity,
translator_specific)
@abc.abstractmethod
def build_params(self):
"""Build the set of parameters to send with the request"""
raise NotImplementedError()
@abc.abstractmethod
def extract_result(self, raw_results):
"""Extrapolate the metric from the raw results of the request"""
raise NotImplementedError()

View File

@@ -0,0 +1,87 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
from watcher.common import exception
from watcher.datasources.grafana_translator.base import BaseGrafanaTranslator
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class InfluxDBGrafanaTranslator(BaseGrafanaTranslator):
"""Grafana translator to communicate with InfluxDB database"""
NAME = 'influxdb'
def __init__(self, data):
super(InfluxDBGrafanaTranslator, self).__init__(data)
def build_params(self):
""""""
data = self._data
retention_period = None
available_periods = CONF.grafana_translators.retention_periods.items()
for key, value in sorted(available_periods, key=lambda x: x[1]):
if int(data['period']) < int(value):
retention_period = key
break
if retention_period is None:
retention_period = max(available_periods)[0]
LOG.warning("Longest retention period is to short for desired"
" period")
try:
resource = self._extract_attribute(
data['resource'], data['attribute'])
except AttributeError:
LOG.error("Resource: {0} does not contain attribute {1}".format(
data['resource'], data['attribute']))
raise
# Granularity is optional if it is None the minimal value for InfluxDB
# will be 1
granularity = \
data['granularity'] if data['granularity'] is not None else 1
return {'db': data['db'],
'epoch': 'ms',
'q': self._query_format(
data['query'], data['aggregate'], resource, data['period'],
granularity, retention_period)}
def extract_result(self, raw_results):
""""""
try:
# For result structure see:
# https://docs.openstack.org/watcher/latest/datasources/grafana.html#InfluxDB
result = jsonutils.loads(raw_results)
result = result['results'][0]['series'][0]
index_aggregate = result['columns'].index(self._data['aggregate'])
return result['values'][0][index_aggregate]
except KeyError:
LOG.error("Could not extract {0} for the resource: {1}".format(
self._data['metric'], self._data['resource']))
raise exception.NoSuchMetricForHost(
metric=self._data['metric'], host=self._data['resource'])

View File

@@ -23,6 +23,7 @@ from oslo_log import log
from watcher.common import exception
from watcher.datasources import ceilometer as ceil
from watcher.datasources import gnocchi as gnoc
from watcher.datasources import grafana as graf
from watcher.datasources import monasca as mon
LOG = log.getLogger(__name__)
@@ -34,6 +35,7 @@ class DataSourceManager(object):
(gnoc.GnocchiHelper.NAME, gnoc.GnocchiHelper.METRIC_MAP),
(ceil.CeilometerHelper.NAME, ceil.CeilometerHelper.METRIC_MAP),
(mon.MonascaHelper.NAME, mon.MonascaHelper.METRIC_MAP),
(graf.GrafanaHelper.NAME, graf.GrafanaHelper.METRIC_MAP),
])
"""Dictionary with all possible datasources, dictionary order is the default
order for attempting to use datasources
@@ -45,6 +47,11 @@ class DataSourceManager(object):
self._ceilometer = None
self._monasca = None
self._gnocchi = None
self._grafana = None
# Dynamically update grafana metric map, only available at runtime
# The metric map can still be overridden by a yaml config file
self.metric_map[graf.GrafanaHelper.NAME] = self.grafana.METRIC_MAP
metric_map_path = cfg.CONF.watcher_decision_engine.metric_map_path
metrics_from_file = self.load_metric_map(metric_map_path)
@@ -54,6 +61,7 @@ class DataSourceManager(object):
except KeyError:
msgargs = (ds, self.metric_map.keys())
LOG.warning('Invalid Datasource: %s. Allowed: %s ', *msgargs)
self.datasources = self.config.datasources
@property
@@ -86,6 +94,16 @@ class DataSourceManager(object):
def gnocchi(self, gnocchi):
self._gnocchi = gnocchi
@property
def grafana(self):
if self._grafana is None:
self._grafana = graf.GrafanaHelper(osc=self.osc)
return self._grafana
@grafana.setter
def grafana(self, grafana):
self._grafana = grafana
def get_backend(self, metrics):
"""Determine the datasource to use from the configuration

View File

@@ -0,0 +1,105 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from oslo_log import log
from watcher.common import exception
from watcher.datasources.grafana_translator import base as base_translator
from watcher.tests import base
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class TestGrafanaTranslatorBase(base.BaseTestCase):
"""Base class for all GrafanaTranslator test classes
Objects under test are preceded with t_ and mocked objects are preceded
with m_ , additionally, patched objects are preceded with p_ no object
under test should be created in setUp this can influence the results.
"""
def setUp(self):
super(TestGrafanaTranslatorBase, self).setUp()
"""Basic valid reference data"""
self.reference_data = {
'metric': 'host_cpu_usage',
'db': 'production',
'attribute': 'hostname',
'query': 'SHOW all_base FROM belong_to_us',
'resource': mock.Mock(hostname='hyperion'),
'resource_type': 'compute_node',
'period': '120',
'aggregate': 'mean',
'granularity': None
}
class TestBaseGrafanaTranslator(TestGrafanaTranslatorBase):
"""Test the GrafanaTranslator base class
Objects under test are preceded with t_ and mocked objects are preceded
with m_ , additionally, patched objects are preceded with p_ no object
under test should be created in setUp this can influence the results.
"""
def setUp(self):
super(TestBaseGrafanaTranslator, self).setUp()
def test_validate_data(self):
"""Initialize InfluxDBGrafanaTranslator and check data validation"""
t_base_translator = base_translator.BaseGrafanaTranslator(
data=self.reference_data)
self.assertIsInstance(t_base_translator,
base_translator.BaseGrafanaTranslator)
def test_validate_data_error(self):
"""Initialize InfluxDBGrafanaTranslator and check data validation"""
self.assertRaises(exception.InvalidParameter,
base_translator.BaseGrafanaTranslator,
data=[])
def test_extract_attribute(self):
"""Test that an attribute can be extracted from an object"""
m_object = mock.Mock(hostname='test')
t_base_translator = base_translator.BaseGrafanaTranslator(
data=self.reference_data)
self.assertEqual('test', t_base_translator._extract_attribute(
m_object, 'hostname'))
def test_extract_attribute_error(self):
"""Test error on attempt to extract none existing attribute"""
m_object = mock.Mock(hostname='test')
m_object.test = mock.PropertyMock(side_effect=AttributeError)
t_base_translator = base_translator.BaseGrafanaTranslator(
data=self.reference_data)
self.assertRaises(AttributeError, t_base_translator._extract_attribute(
m_object, 'test'))

View File

@@ -0,0 +1,175 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import mock
from oslo_config import cfg
from oslo_log import log
from watcher.common import exception
from watcher.datasources.grafana_translator import influxdb
from watcher.tests.datasources.grafana_translators import test_base
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class TestInfluxDBGrafanaTranslator(test_base.TestGrafanaTranslatorBase):
"""Test the InfluxDB gragana database translator
Objects under test are preceded with t_ and mocked objects are preceded
with m_ , additionally, patched objects are preceded with p_ no object
under test should be created in setUp this can influence the results.
"""
def setUp(self):
super(TestInfluxDBGrafanaTranslator, self).setUp()
self.p_conf = mock.patch.object(
influxdb, 'CONF',
new_callable=mock.PropertyMock)
self.m_conf = self.p_conf.start()
self.addCleanup(self.p_conf.stop)
self.m_conf.grafana_translators.retention_periods = {
'one_day': 86400,
'one_week': 604800
}
def test_retention_period_one_day(self):
"""Validate lowest retention period"""
data = copy.copy(self.reference_data)
data['query'] = "{4}"
t_influx = influxdb.InfluxDBGrafanaTranslator(
data=data)
params = t_influx.build_params()
self.assertEqual(params['q'], 'one_day')
def test_retention_period_one_week(self):
"""Validate incrementing retention periods"""
data = copy.copy(self.reference_data)
data['query'] = "{4}"
data['period'] = 90000
t_influx = influxdb.InfluxDBGrafanaTranslator(
data=data)
params = t_influx.build_params()
self.assertEqual(params['q'], 'one_week')
@mock.patch.object(influxdb, 'LOG')
def test_retention_period_warning(self, m_log):
"""Validate retention period warning"""
data = copy.copy(self.reference_data)
data['query'] = "{4}"
data['period'] = 650000
t_influx = influxdb.InfluxDBGrafanaTranslator(
data=data)
params = t_influx.build_params()
self.assertEqual(params['q'], 'one_week')
m_log.warning.assert_called_once_with(
"Longest retention period is to short for desired period")
def test_build_params_granularity(self):
"""Validate build params granularity"""
data = copy.copy(self.reference_data)
data['granularity'] = None
data['query'] = "{3}"
t_influx = influxdb.InfluxDBGrafanaTranslator(
data=data)
raw_results = {
'db': 'production',
'epoch': 'ms',
'q': '1'
}
# InfluxDB build_params should replace granularity None optional with 1
result = t_influx.build_params()
self.assertEqual(raw_results, result)
def test_build_params_order(self):
"""Validate order of build params"""
data = copy.copy(self.reference_data)
data['aggregate'] = 'count'
# prevent having to deepcopy by keeping this value the same
# this will access the value 'hyperion' from the mocked resource object
data['attribute'] = 'hostname'
data['period'] = 3
# because the period is only 3 the retention_period will be one_day
data['granularity'] = 4
data['query'] = "{0}{1}{2}{3}{4}"
t_influx = influxdb.InfluxDBGrafanaTranslator(
data=data)
raw_results = "counthyperion34one_day"
result = t_influx.build_params()
self.assertEqual(raw_results, result['q'])
def test_extract_results(self):
"""Validate proper result extraction"""
t_influx = influxdb.InfluxDBGrafanaTranslator(
data=self.reference_data)
raw_results = "{ \"results\": [{ \"series\": [{ " \
"\"columns\": [\"time\",\"mean\"]," \
"\"values\": [[1552500855000, " \
"67.3550078657577]]}]}]}"
# Structure of InfluxDB time series data
# { "results": [{
# "statement_id": 0,
# "series": [{
# "name": "cpu_percent",
# "columns": [
# "time",
# "mean"
# ],
# "values": [[
# 1552500855000,
# 67.3550078657577
# ]]
# }]
# }]}
self.assertEqual(t_influx.extract_result(raw_results),
67.3550078657577)
def test_extract_results_error(self):
"""Validate error on missing results"""
t_influx = influxdb.InfluxDBGrafanaTranslator(
data=self.reference_data)
raw_results = "{}"
self.assertRaises(exception.NoSuchMetricForHost,
t_influx.extract_result, raw_results)

View File

@@ -0,0 +1,305 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from oslo_log import log
from watcher.common import clients
from watcher.common import exception
from watcher.datasources import grafana
from watcher.tests import base
import requests
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@mock.patch.object(clients.OpenStackClients, 'nova', mock.Mock())
class TestGrafana(base.BaseTestCase):
"""Test the GrafanaHelper datasource
Objects under test are preceded with t_ and mocked objects are preceded
with m_ , additionally, patched objects are preceded with p_ no object
under test should be created in setUp this can influence the results.
"""
def setUp(self):
super(TestGrafana, self).setUp()
self.p_conf = mock.patch.object(
grafana, 'CONF',
new_callable=mock.PropertyMock)
self.m_conf = self.p_conf.start()
self.addCleanup(self.p_conf.stop)
self.m_conf.grafana_client.token = \
"eyJrIjoiT0tTcG1pUlY2RnVKZTFVaDFsNFZXdE9ZWmNrMkZYbk=="
self.m_conf.grafana_client.base_url = "https://grafana.proxy/api/"
self.m_conf.grafana_client.project_id_map = {'host_cpu_usage': 7221}
self.m_conf.grafana_client.database_map = \
{'host_cpu_usage': 'mock_db'}
self.m_conf.grafana_client.attribute_map = \
{'host_cpu_usage': 'hostname'}
self.m_conf.grafana_client.translator_map = \
{'host_cpu_usage': 'influxdb'}
self.m_conf.grafana_client.query_map = \
{'host_cpu_usage': 'SELECT 100-{0}("{0}_value") FROM {3}.'
'cpu_percent WHERE ("host" =~ /^{1}$/ AND '
'"type_instance" =~/^idle$/ AND time > '
'(now()-{2}m)'}
self.m_grafana = grafana.GrafanaHelper(osc=mock.Mock())
stat_agg_patcher = mock.patch.object(
self.m_grafana, 'statistic_aggregation',
spec=grafana.GrafanaHelper.statistic_aggregation)
self.mock_aggregation = stat_agg_patcher.start()
self.addCleanup(stat_agg_patcher.stop)
self.m_compute_node = mock.Mock(
id='16a86790-327a-45f9-bc82-45839f062fdc',
hostname='example.hostname.ch'
)
self.m_instance = mock.Mock(
id='73b1ff78-aca7-404f-ac43-3ed16c1fa555',
human_id='example.hostname'
)
def test_configured(self):
"""Initialize GrafanaHelper and check if configured is true"""
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
self.assertTrue(t_grafana.configured)
def test_configured_error(self):
"""Butcher the required configuration and test if configured is false
"""
self.m_conf.grafana_client.base_url = ""
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
self.assertFalse(t_grafana.configured)
def test_configured_raise_error(self):
"""Test raising error when using improperly configured GrafanHelper
Assure that the _get_metric method raises errors if the metric is
missing from the map
"""
# Clear the METRIC_MAP of Grafana since it is a static variable that
# other tests might have set before this test runs.
grafana.GrafanaHelper.METRIC_MAP = {}
self.m_conf.grafana_client.base_url = ""
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
self.assertFalse(t_grafana.configured)
self.assertEqual({}, t_grafana.METRIC_MAP)
self.assertRaises(
exception.MetricNotAvailable,
t_grafana.get_host_cpu_usage,
self.m_compute_node
)
@mock.patch.object(requests, 'get')
def test_request_raise_error(self, m_request):
"""Test raising error when status code of request indicates problem
Assure that the _request method raises errors if the response indicates
problems.
"""
m_request.return_value = mock.Mock(status_code=404)
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
self.assertRaises(
exception.DataSourceNotAvailable,
t_grafana.get_host_cpu_usage,
self.m_compute_node
)
def test_no_metric_raise_error(self):
"""Test raising error when specified meter does not exist"""
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
self.assertRaises(exception.MetricNotAvailable,
t_grafana.statistic_aggregation,
self.m_compute_node,
'none existing meter', 60)
@mock.patch.object(grafana.GrafanaHelper, '_request')
def test_get_metric_raise_error(self, m_request):
"""Test raising error when endpoint unable to deliver data for metric
"""
m_request.return_value.content = "{}"
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
self.assertRaises(exception.NoSuchMetricForHost,
t_grafana.get_host_cpu_usage,
self.m_compute_node, 60)
def test_metric_builder(self):
"""Creates valid and invalid sets of configuration for metrics
Ensures that a valid metric entry can be configured even if multiple
invalid configurations exist for other metrics.
"""
self.m_conf.grafana_client.project_id_map = {
'host_cpu_usage': 7221,
'host_ram_usage': 7221,
'instance_ram_allocated': 7221,
}
self.m_conf.grafana_client.database_map = {
'host_cpu_usage': 'mock_db',
'instance_cpu_usage': 'mock_db',
'instance_ram_allocated': 'mock_db',
}
self.m_conf.grafana_client.attribute_map = {
'host_cpu_usage': 'hostname',
'host_power': 'hostname',
'instance_ram_allocated': 'human_id',
}
self.m_conf.grafana_client.translator_map = {
'host_cpu_usage': 'influxdb',
'host_inlet_temp': 'influxdb',
# validate that invalid entries don't get added
'instance_ram_usage': 'dummy',
'instance_ram_allocated': 'influxdb',
}
self.m_conf.grafana_client.query_map = {
'host_cpu_usage': 'SHOW SERIES',
'instance_ram_usage': 'SHOW SERIES',
'instance_ram_allocated': 'SHOW SERIES',
}
expected_result = {
'host_cpu_usage': {
'db': 'mock_db',
'project': 7221,
'attribute': 'hostname',
'translator': 'influxdb',
'query': 'SHOW SERIES'},
'instance_ram_allocated': {
'db': 'mock_db',
'project': 7221,
'attribute': 'human_id',
'translator': 'influxdb',
'query': 'SHOW SERIES'},
}
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
self.assertEqual(t_grafana.METRIC_MAP, expected_result)
@mock.patch.object(grafana.GrafanaHelper, '_request')
def test_statistic_aggregation(self, m_request):
m_request.return_value.content = "{ \"results\": [{ \"series\": [{ " \
"\"columns\": [\"time\",\"mean\"]," \
"\"values\": [[1552500855000, " \
"67.3550078657577]]}]}]}"
t_grafana = grafana.GrafanaHelper(osc=mock.Mock())
result = t_grafana.statistic_aggregation(
self.m_compute_node, 'compute_node', 'host_cpu_usage', 60)
self.assertEqual(result, 67.3550078657577)
def test_get_host_cpu_usage(self):
self.m_grafana.get_host_cpu_usage(self.m_compute_node, 60, 'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'compute_node', 'host_cpu_usage', 60, 'min',
15)
def test_get_host_ram_usage(self):
self.m_grafana.get_host_ram_usage(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'compute_node', 'host_ram_usage', 60, 'min',
15)
def test_get_host_outlet_temperature(self):
self.m_grafana.get_host_outlet_temp(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'compute_node', 'host_outlet_temp', 60, 'min',
15)
def test_get_host_inlet_temperature(self):
self.m_grafana.get_host_inlet_temp(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'compute_node', 'host_inlet_temp', 60, 'min',
15)
def test_get_host_airflow(self):
self.m_grafana.get_host_airflow(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'compute_node', 'host_airflow', 60, 'min',
15)
def test_get_host_power(self):
self.m_grafana.get_host_power(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'compute_node', 'host_power', 60, 'min',
15)
def test_get_instance_cpu_usage(self):
self.m_grafana.get_instance_cpu_usage(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'instance', 'instance_cpu_usage', 60,
'min', 15)
def test_get_instance_ram_usage(self):
self.m_grafana.get_instance_ram_usage(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'instance', 'instance_ram_usage', 60,
'min', 15)
def test_get_instance_ram_allocated(self):
self.m_grafana.get_instance_ram_allocated(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'instance', 'instance_ram_allocated', 60,
'min', 15)
def test_get_instance_l3_cache_usage(self):
self.m_grafana.get_instance_l3_cache_usage(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'instance', 'instance_l3_cache_usage', 60,
'min', 15)
def test_get_instance_root_disk_allocated(self):
self.m_grafana.get_instance_root_disk_size(self.m_compute_node, 60,
'min', 15)
self.mock_aggregation.assert_called_once_with(
self.m_compute_node, 'instance', 'instance_root_disk_size', 60,
'min', 15)

View File

@@ -15,12 +15,12 @@
# limitations under the License.
import mock
import six
from mock import MagicMock
from watcher.common import exception
from watcher.datasources import gnocchi
from watcher.datasources import grafana
from watcher.datasources import manager as ds_manager
from watcher.datasources import monasca
from watcher.tests import base
@@ -57,6 +57,31 @@ class TestDataSourceManager(base.BaseTestCase):
backend = manager.get_backend(['host_airflow'])
self.assertEqual("host_fnspid", backend.METRIC_MAP['host_airflow'])
@mock.patch.object(grafana, 'CONF')
def test_metric_file_metric_override_grafana(self, m_config):
"""Grafana requires a different structure in the metric map"""
m_config.grafana_client.token = \
"eyJrIjoiT0tTcG1pUlY2RnVKZTFVaDFsNFZXdE9ZWmNrMkZYbk=="
m_config.grafana_client.base_url = "https://grafana.proxy/api/"
path = 'watcher.datasources.manager.DataSourceManager.load_metric_map'
metric_map = {
'db': 'production_cloud',
'project': '7485',
'attribute': 'hostname',
'translator': 'influxdb',
'query': 'SHOW SERIES'
}
retval = {
grafana.GrafanaHelper.NAME: {"host_airflow": metric_map}
}
with mock.patch(path, return_value=retval):
dsmcfg = self._dsm_config(datasources=['grafana'])
manager = self._dsm(config=dsmcfg)
backend = manager.get_backend(['host_airflow'])
self.assertEqual(metric_map, backend.METRIC_MAP['host_airflow'])
def test_metric_file_invalid_ds(self):
with mock.patch('yaml.safe_load') as mo:
mo.return_value = {"newds": {"metric_one": "i_am_metric_one"}}
@@ -77,10 +102,8 @@ class TestDataSourceManager(base.BaseTestCase):
def test_get_backend_wrong_metric(self):
manager = self._dsm()
ex = self.assertRaises(
exception.MetricNotAvailable, manager.get_backend,
['host_cpu', 'instance_cpu_usage'])
self.assertIn('Metric: host_cpu not available', six.text_type(ex))
self.assertRaises(exception.MetricNotAvailable, manager.get_backend,
['host_cpu', 'instance_cpu_usage'])
@mock.patch.object(gnocchi, 'GnocchiHelper')
def test_get_backend_error_datasource(self, m_gnocchi):
@@ -89,6 +112,33 @@ class TestDataSourceManager(base.BaseTestCase):
backend = manager.get_backend(['host_cpu_usage', 'instance_cpu_usage'])
self.assertEqual(backend, manager.ceilometer)
@mock.patch.object(grafana.GrafanaHelper, 'METRIC_MAP',
{'host_cpu_usage': 'test'})
def test_get_backend_grafana(self):
dss = ['grafana', 'ceilometer', 'gnocchi']
dsmcfg = self._dsm_config(datasources=dss)
manager = self._dsm(config=dsmcfg)
backend = manager.get_backend(['host_cpu_usage'])
self.assertEqual(backend, manager.grafana)
@mock.patch.object(grafana, 'CONF')
def test_dynamic_metric_map_grafana(self, m_config):
m_config.grafana_client.token = \
"eyJrIjoiT0tTcG1pUlY2RnVKZTFVaDFsNFZXdE9ZWmNrMkZYbk=="
m_config.grafana_client.base_url = "https://grafana.proxy/api/"
m_config.grafana_client.project_id_map = {'host_cpu_usage': 7221}
m_config.grafana_client.attribute_map = {'host_cpu_usage': 'hostname'}
m_config.grafana_client.database_map = {'host_cpu_usage': 'mock_db'}
m_config.grafana_client.translator_map = {'host_cpu_usage': 'influxdb'}
m_config.grafana_client.query_map = {
'host_cpu_usage': 'SHOW SERIES'
}
dss = ['grafana', 'ceilometer', 'gnocchi']
dsmcfg = self._dsm_config(datasources=dss)
manager = self._dsm(config=dsmcfg)
backend = manager.get_backend(['host_cpu_usage'])
self.assertEqual(backend, manager.grafana)
def test_get_backend_no_datasources(self):
dsmcfg = self._dsm_config(datasources=[])
manager = self._dsm(config=dsmcfg)