Files
watcher/watcher/decision_engine/datasources/grafana.py
licanwei f685bf62ab Don't throw exception when missing metrics
When querying data from datasource, it's possible to miss some data.
In this case if we throw an exception, Audit will failed because of
the exception. We should remove the exception and give the decision
to the strategy.

Change-Id: I1b0e6b78b3bba4df9ba16e093b3910aab1de922e
Closes-Bug: #1847434
2019-10-16 21:01:39 -07:00

255 lines
9.6 KiB
Python

# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log
import six.moves.urllib.parse as urlparse
from watcher.common import clients
from watcher.common import exception
from watcher.decision_engine.datasources import base
from watcher.decision_engine.datasources.grafana_translator import influxdb
import requests
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class GrafanaHelper(base.DataSourceBase):
NAME = 'grafana'
"""METRIC_MAP is only available at runtime _build_metric_map"""
METRIC_MAP = dict()
"""All available translators"""
TRANSLATOR_LIST = [
influxdb.InfluxDBGrafanaTranslator.NAME
]
def __init__(self, osc=None):
""":param osc: an OpenStackClients instance"""
self.osc = osc if osc else clients.OpenStackClients()
self.nova = self.osc.nova()
self.configured = False
self._base_url = None
self._headers = None
self._setup()
def _setup(self):
"""Configure grafana helper to perform requests"""
token = CONF.grafana_client.token
base_url = CONF.grafana_client.base_url
if not token:
LOG.critical("GrafanaHelper authentication token not configured")
return
self._headers = {"Authorization": "Bearer " + token,
"Content-Type": "Application/json"}
if not base_url:
LOG.critical("GrafanaHelper url not properly configured, "
"check base_url")
return
self._base_url = base_url
# Very basic url parsing
parse = urlparse.urlparse(self._base_url)
if parse.scheme is '' or parse.netloc is '' or parse.path is '':
LOG.critical("GrafanaHelper url not properly configured, "
"check base_url and project_id")
return
self._build_metric_map()
if len(self.METRIC_MAP) == 0:
LOG.critical("GrafanaHelper not configured for any metrics")
self.configured = True
def _build_metric_map(self):
"""Builds the metric map by reading config information"""
for key, value in CONF.grafana_client.database_map.items():
try:
project = CONF.grafana_client.project_id_map[key]
attribute = CONF.grafana_client.attribute_map[key]
translator = CONF.grafana_client.translator_map[key]
query = CONF.grafana_client.query_map[key]
if project is not None and \
value is not None and\
translator in self.TRANSLATOR_LIST and\
query is not None:
self.METRIC_MAP[key] = {
'db': value,
'project': project,
'attribute': attribute,
'translator': translator,
'query': query
}
except KeyError as e:
LOG.error(e)
def _build_translator_schema(self, metric, db, attribute, query, resource,
resource_type, period, aggregate,
granularity):
"""Create dictionary to pass to grafana proxy translators"""
return {'metric': metric, 'db': db, 'attribute': attribute,
'query': query, 'resource': resource,
'resource_type': resource_type, 'period': period,
'aggregate': aggregate, 'granularity': granularity}
def _get_translator(self, name, data):
"""Use the names of translators to get the translator for the metric"""
if name == influxdb.InfluxDBGrafanaTranslator.NAME:
return influxdb.InfluxDBGrafanaTranslator(data)
else:
raise exception.InvalidParameter(
parameter='name', parameter_type='grafana translator')
def _request(self, params, project_id):
"""Make the request to the endpoint to retrieve data
If the request fails, determines what error to raise.
"""
if self.configured is False:
raise exception.DataSourceNotAvailable(self.NAME)
resp = requests.get(self._base_url + str(project_id) + '/query',
params=params, headers=self._headers)
if resp.status_code == 200:
return resp
elif resp.status_code == 400:
LOG.error("Query for metric is invalid")
elif resp.status_code == 401:
LOG.error("Authorization token is invalid")
raise exception.DataSourceNotAvailable(self.NAME)
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
"""Get the value for the specific metric based on specified parameters
"""
try:
self.METRIC_MAP[meter_name]
except KeyError:
LOG.error("Metric: {0} does not appear in the current Grafana "
"metric map".format(meter_name))
raise exception.MetricNotAvailable(metric=meter_name)
db = self.METRIC_MAP[meter_name]['db']
project = self.METRIC_MAP[meter_name]['project']
attribute = self.METRIC_MAP[meter_name]['attribute']
translator_name = self.METRIC_MAP[meter_name]['translator']
query = self.METRIC_MAP[meter_name]['query']
data = self._build_translator_schema(
meter_name, db, attribute, query, resource, resource_type, period,
aggregate, granularity)
translator = self._get_translator(translator_name, data)
params = translator.build_params()
raw_kwargs = dict(
params=params,
project_id=project,
)
kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
resp = self.query_retry(self._request, **kwargs)
if not resp:
LOG.warning("Datasource {0} is not available.".format(self.NAME))
return
result = translator.extract_result(resp.content)
return result
def get_host_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_cpu_usage', period, aggregate,
granularity)
def get_host_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_ram_usage', period, aggregate,
granularity)
def get_host_outlet_temp(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_outlet_temp', period, aggregate,
granularity)
def get_host_inlet_temp(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_inlet_temp', period, aggregate,
granularity)
def get_host_airflow(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_airflow', period, aggregate,
granularity)
def get_host_power(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_power', period, aggregate,
granularity)
def get_instance_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_cpu_usage', period, aggregate,
granularity)
def get_instance_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_usage', period, aggregate,
granularity)
def get_instance_ram_allocated(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_allocated', period, aggregate,
granularity)
def get_instance_l3_cache_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_l3_cache_usage', period, aggregate,
granularity)
def get_instance_root_disk_size(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_root_disk_size', period, aggregate,
granularity)