Move datasources folder into decision_engine

The datasources are only used by the decision_engine, however, they
are placed in a directory one level higher. This patch moves the
datasources code into the decision_engine folder.

Change-Id: Ia54531fb899b79a59bb77adea079ff27c0d518fa
This commit is contained in:
Dantali0n
2019-07-12 08:54:09 +02:00
parent 233a2b5585
commit 433eabb8d1
23 changed files with 33 additions and 28 deletions

View File

@@ -0,0 +1,227 @@
# -*- encoding: utf-8 -*-
# Copyright 2017 NEC Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import time
from oslo_config import cfg
from oslo_log import log
from watcher.common import exception
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class DataSourceBase(object):
"""Base Class for datasources in Watcher
This base class defines the abstract methods that datasources should
implement and contains details on the values expected for parameters as
well as what the values for return types should be.
"""
"""Possible options for the parameters named aggregate"""
AGGREGATES = ['mean', 'min', 'max', 'count']
"""Possible options for the parameters named resource_type"""
RESOURCE_TYPES = ['compute_node', 'instance', 'bare_metal', 'storage']
"""Each datasource should have a uniquely identifying name"""
NAME = ''
"""Possible metrics a datasource can support and their internal name"""
METRIC_MAP = dict(host_cpu_usage=None,
host_ram_usage=None,
host_outlet_temp=None,
host_inlet_temp=None,
host_airflow=None,
host_power=None,
instance_cpu_usage=None,
instance_ram_usage=None,
instance_ram_allocated=None,
instance_l3_cache_usage=None,
instance_root_disk_size=None,
)
def query_retry(self, f, *args, **kwargs):
"""Attempts to retrieve metrics from the external service
Attempts to access data from the external service and handles
exceptions upon exception the retrieval should be retried in accordance
to the value of query_max_retries
:param f: The method that performs the actual querying for metrics
:param args: Array of arguments supplied to the method
:param kwargs: The amount of arguments supplied to the method
:return: The value as retrieved from the external service
"""
num_retries = CONF.watcher_datasources.query_max_retries
timeout = CONF.watcher_datasources.query_timeout
for i in range(num_retries):
try:
return f(*args, **kwargs)
except Exception as e:
LOG.exception(e)
self.query_retry_reset(e)
LOG.warning("Retry {0} of {1} while retrieving metrics retry "
"in {2} seconds".format(i+1, num_retries, timeout))
time.sleep(timeout)
raise exception.DataSourceNotAvailable(datasource=self.NAME)
@abc.abstractmethod
def query_retry_reset(self, exception_instance):
"""Abstract method to perform reset operations upon request failure"""
pass
@abc.abstractmethod
def list_metrics(self):
"""Returns the supported metrics that the datasource can retrieve
:return: List of supported metrics containing keys from METRIC_MAP
"""
pass
@abc.abstractmethod
def check_availability(self):
"""Tries to contact the datasource to see if it is available
:return: True or False with true meaning the datasource is available
"""
pass
@abc.abstractmethod
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
"""Retrieves and converts metrics based on the specified parameters
:param resource: Resource object as defined in watcher models such as
ComputeNode and Instance
:param resource_type: Indicates which type of object is supplied
to the resource parameter
:param meter_name: The desired metric to retrieve as key from
METRIC_MAP
:param period: Time span to collect metrics from in seconds
:param granularity: Interval between samples in measurements in
seconds
:param aggregate: Aggregation method to extract value from set of
samples
:return: The gathered value for the metric the type of value depends on
the meter_name
"""
pass
@abc.abstractmethod
def get_host_cpu_usage(self, resource, period, aggregate,
granularity=None):
"""Get the cpu usage for a host such as a compute_node
:return: cpu usage as float ranging between 0 and 100 representing the
total cpu usage as percentage
"""
pass
@abc.abstractmethod
def get_host_ram_usage(self, resource, period, aggregate,
granularity=None):
"""Get the ram usage for a host such as a compute_node
:return: ram usage as float in megabytes
"""
pass
@abc.abstractmethod
def get_host_outlet_temp(self, resource, period, aggregate,
granularity=None):
"""Get the outlet temperature for a host such as compute_node
:return: outlet temperature as float in degrees celsius
"""
pass
@abc.abstractmethod
def get_host_inlet_temp(self, resource, period, aggregate,
granularity=None):
"""Get the inlet temperature for a host such as compute_node
:return: inlet temperature as float in degrees celsius
"""
pass
@abc.abstractmethod
def get_host_airflow(self, resource, period, aggregate,
granularity=None):
"""Get the airflow for a host such as compute_node
:return: airflow as float in cfm
"""
pass
@abc.abstractmethod
def get_host_power(self, resource, period, aggregate,
granularity=None):
"""Get the power for a host such as compute_node
:return: power as float in watts
"""
pass
@abc.abstractmethod
def get_instance_cpu_usage(self, resource, period,
aggregate, granularity=None):
"""Get the cpu usage for an instance
:return: cpu usage as float ranging between 0 and 100 representing the
total cpu usage as percentage
"""
pass
@abc.abstractmethod
def get_instance_ram_usage(self, resource, period,
aggregate, granularity=None):
"""Get the ram usage for an instance
:return: ram usage as float in megabytes
"""
pass
@abc.abstractmethod
def get_instance_ram_allocated(self, resource, period,
aggregate, granularity=None):
"""Get the ram allocated for an instance
:return: total ram allocated as float in megabytes
"""
pass
@abc.abstractmethod
def get_instance_l3_cache_usage(self, resource, period,
aggregate, granularity=None):
"""Get the l3 cache usage for an instance
:return: l3 cache usage as integer in bytes
"""
pass
@abc.abstractmethod
def get_instance_root_disk_size(self, resource, period,
aggregate, granularity=None):
"""Get the size of the root disk for an instance
:return: root disk size as float in gigabytes
"""
pass

View File

@@ -0,0 +1,273 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from oslo_log import log
from oslo_utils import timeutils
from watcher._i18n import _
from watcher.common import clients
from watcher.common import exception
from watcher.decision_engine.datasources import base
LOG = log.getLogger(__name__)
try:
from ceilometerclient import exc
HAS_CEILCLIENT = True
except ImportError:
HAS_CEILCLIENT = False
class CeilometerHelper(base.DataSourceBase):
NAME = 'ceilometer'
METRIC_MAP = dict(host_cpu_usage='compute.node.cpu.percent',
host_ram_usage='hardware.memory.used',
host_outlet_temp='hardware.ipmi.node.outlet_temperature',
host_inlet_temp='hardware.ipmi.node.temperature',
host_airflow='hardware.ipmi.node.airflow',
host_power='hardware.ipmi.node.power',
instance_cpu_usage='cpu_util',
instance_ram_usage='memory.resident',
instance_ram_allocated='memory',
instance_l3_cache_usage='cpu_l3_cache',
instance_root_disk_size='disk.root.size',
)
def __init__(self, osc=None):
""":param osc: an OpenStackClients instance"""
self.osc = osc if osc else clients.OpenStackClients()
self.ceilometer = self.osc.ceilometer()
LOG.warning("Ceilometer API is deprecated and Ceilometer Datasource "
"module is no longer maintained. We recommend to use "
"Gnocchi instead.")
@staticmethod
def format_query(user_id, tenant_id, resource_id,
user_ids, tenant_ids, resource_ids):
query = []
def query_append(query, _id, _ids, field):
if _id:
_ids = [_id]
for x_id in _ids:
query.append({"field": field, "op": "eq", "value": x_id})
query_append(query, user_id, (user_ids or []), "user_id")
query_append(query, tenant_id, (tenant_ids or []), "project_id")
query_append(query, resource_id, (resource_ids or []), "resource_id")
return query
def _timestamps(self, start_time, end_time):
def _format_timestamp(_time):
if _time:
if isinstance(_time, datetime.datetime):
return _time.isoformat()
return _time
return None
start_timestamp = _format_timestamp(start_time)
end_timestamp = _format_timestamp(end_time)
if ((start_timestamp is not None) and (end_timestamp is not None) and
(timeutils.parse_isotime(start_timestamp) >
timeutils.parse_isotime(end_timestamp))):
raise exception.Invalid(
_("Invalid query: %(start_time)s > %(end_time)s") % dict(
start_time=start_timestamp, end_time=end_timestamp))
return start_timestamp, end_timestamp
def build_query(self, user_id=None, tenant_id=None, resource_id=None,
user_ids=None, tenant_ids=None, resource_ids=None,
start_time=None, end_time=None):
"""Returns query built from given parameters.
This query can be then used for querying resources, meters and
statistics.
:param user_id: user_id, has a priority over list of ids
:param tenant_id: tenant_id, has a priority over list of ids
:param resource_id: resource_id, has a priority over list of ids
:param user_ids: list of user_ids
:param tenant_ids: list of tenant_ids
:param resource_ids: list of resource_ids
:param start_time: datetime from which measurements should be collected
:param end_time: datetime until which measurements should be collected
"""
query = self.format_query(user_id, tenant_id, resource_id,
user_ids, tenant_ids, resource_ids)
start_timestamp, end_timestamp = self._timestamps(start_time,
end_time)
if start_timestamp:
query.append({"field": "timestamp", "op": "ge",
"value": start_timestamp})
if end_timestamp:
query.append({"field": "timestamp", "op": "le",
"value": end_timestamp})
return query
def query_retry_reset(self, exception_instance):
if isinstance(exception_instance, exc.HTTPUnauthorized):
self.osc.reset_clients()
self.ceilometer = self.osc.ceilometer()
def list_metrics(self):
"""List the user's meters."""
try:
meters = self.query_retry(f=self.ceilometer.meters.list)
except Exception:
return set()
else:
return meters
def check_availability(self):
try:
self.query_retry(self.ceilometer.resources.list)
except Exception:
return 'not available'
return 'available'
def query_sample(self, meter_name, query, limit=1):
return self.query_retry(f=self.ceilometer.samples.list,
meter_name=meter_name,
limit=limit,
q=query)
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, granularity=300,
aggregate='mean'):
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(seconds=int(period))
meter = self.METRIC_MAP.get(meter_name)
if meter is None:
raise exception.MetricNotAvailable(metric=meter_name)
if aggregate == 'mean':
aggregate = 'avg'
elif aggregate == 'count':
aggregate = 'avg'
LOG.warning('aggregate type count not supported by ceilometer,'
' replaced with mean.')
resource_id = resource.uuid
if resource_type == 'compute_node':
resource_id = "%s_%s" % (resource.uuid, resource.hostname)
query = self.build_query(
resource_id=resource_id, start_time=start_time, end_time=end_time)
statistic = self.query_retry(f=self.ceilometer.statistics.list,
meter_name=meter,
q=query,
period=period,
aggregates=[
{'func': aggregate}])
item_value = None
if statistic:
item_value = statistic[-1]._info.get('aggregate').get(aggregate)
if meter_name is 'host_airflow':
# Airflow from hardware.ipmi.node.airflow is reported as
# 1/10 th of actual CFM
item_value *= 10
return item_value
def get_host_cpu_usage(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_cpu_usage', period,
aggregate, granularity)
def get_host_ram_usage(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_ram_usage', period,
aggregate, granularity)
def get_host_outlet_temp(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_outlet_temp', period,
aggregate, granularity)
def get_host_inlet_temp(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_inlet_temp', period,
aggregate, granularity)
def get_host_airflow(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_airflow', period,
aggregate, granularity)
def get_host_power(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_power', period,
aggregate, granularity)
def get_instance_cpu_usage(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_cpu_usage', period,
aggregate, granularity)
def get_instance_ram_usage(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_usage', period,
aggregate, granularity)
def get_instance_ram_allocated(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_allocated', period,
aggregate, granularity)
def get_instance_l3_cache_usage(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_l3_cache_usage', period,
aggregate, granularity)
def get_instance_root_disk_size(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_root_disk_size', period,
aggregate, granularity)

View File

@@ -0,0 +1,200 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2017 Servionica
#
# Authors: Alexander Chadin <a.chadin@servionica.ru>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from datetime import timedelta
from oslo_config import cfg
from oslo_log import log
from watcher.common import clients
from watcher.common import exception
from watcher.decision_engine.datasources import base
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class GnocchiHelper(base.DataSourceBase):
NAME = 'gnocchi'
METRIC_MAP = dict(host_cpu_usage='compute.node.cpu.percent',
host_ram_usage='hardware.memory.used',
host_outlet_temp='hardware.ipmi.node.outlet_temperature',
host_inlet_temp='hardware.ipmi.node.temperature',
host_airflow='hardware.ipmi.node.airflow',
host_power='hardware.ipmi.node.power',
instance_cpu_usage='cpu_util',
instance_ram_usage='memory.resident',
instance_ram_allocated='memory',
instance_l3_cache_usage='cpu_l3_cache',
instance_root_disk_size='disk.root.size',
)
def __init__(self, osc=None):
""":param osc: an OpenStackClients instance"""
self.osc = osc if osc else clients.OpenStackClients()
self.gnocchi = self.osc.gnocchi()
def check_availability(self):
try:
self.query_retry(self.gnocchi.status.get)
except Exception:
return 'not available'
return 'available'
def list_metrics(self):
"""List the user's meters."""
try:
response = self.query_retry(f=self.gnocchi.metric.list)
except Exception:
return set()
else:
return set([metric['name'] for metric in response])
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
stop_time = datetime.utcnow()
start_time = stop_time - timedelta(seconds=(int(period)))
meter = self.METRIC_MAP.get(meter_name)
if meter is None:
raise exception.MetricNotAvailable(metric=meter_name)
if aggregate == 'count':
aggregate = 'mean'
LOG.warning('aggregate type count not supported by gnocchi,'
' replaced with mean.')
resource_id = resource.uuid
if resource_type == 'compute_node':
resource_id = "%s_%s" % (resource.uuid, resource.hostname)
kwargs = dict(query={"=": {"original_resource_id": resource_id}},
limit=1)
resources = self.query_retry(
f=self.gnocchi.resource.search, **kwargs)
if not resources:
raise exception.ResourceNotFound(name='gnocchi',
id=resource_id)
resource_id = resources[0]['id']
raw_kwargs = dict(
metric=meter,
start=start_time,
stop=stop_time,
resource_id=resource_id,
granularity=granularity,
aggregation=aggregate,
)
kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
statistics = self.query_retry(
f=self.gnocchi.metric.get_measures, **kwargs)
if statistics:
# return value of latest measure
# measure has structure [time, granularity, value]
return_value = statistics[-1][2]
if meter_name is 'host_airflow':
# Airflow from hardware.ipmi.node.airflow is reported as
# 1/10 th of actual CFM
return_value *= 10
return return_value
def get_host_cpu_usage(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'compute_node', 'host_cpu_usage', period,
aggregate, granularity)
def get_host_ram_usage(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'compute_node', 'host_ram_usage', period,
aggregate, granularity)
def get_host_outlet_temp(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'compute_node', 'host_outlet_temp', period,
aggregate, granularity)
def get_host_inlet_temp(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'compute_node', 'host_inlet_temp', period,
aggregate, granularity)
def get_host_airflow(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'compute_node', 'host_airflow', period,
aggregate, granularity)
def get_host_power(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'compute_node', 'host_power', period,
aggregate, granularity)
def get_instance_cpu_usage(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'instance', 'instance_cpu_usage', period,
aggregate, granularity)
def get_instance_ram_usage(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_usage', period,
aggregate, granularity)
def get_instance_ram_allocated(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_allocated', period,
aggregate, granularity)
def get_instance_l3_cache_usage(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'instance', 'instance_l3_cache_usage', period,
aggregate, granularity)
def get_instance_root_disk_size(self, resource, period, aggregate,
granularity=300):
return self.statistic_aggregation(
resource, 'instance', 'instance_root_disk_size', period,
aggregate, granularity)

View File

@@ -0,0 +1,251 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log
import six.moves.urllib.parse as urlparse
from watcher.common import clients
from watcher.common import exception
from watcher.decision_engine.datasources import base
from watcher.decision_engine.datasources.grafana_translator import influxdb
import requests
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class GrafanaHelper(base.DataSourceBase):
NAME = 'grafana'
"""METRIC_MAP is only available at runtime _build_metric_map"""
METRIC_MAP = dict()
"""All available translators"""
TRANSLATOR_LIST = [
influxdb.InfluxDBGrafanaTranslator.NAME
]
def __init__(self, osc=None):
""":param osc: an OpenStackClients instance"""
self.osc = osc if osc else clients.OpenStackClients()
self.nova = self.osc.nova()
self.configured = False
self._base_url = None
self._headers = None
self._setup()
def _setup(self):
"""Configure grafana helper to perform requests"""
token = CONF.grafana_client.token
base_url = CONF.grafana_client.base_url
if not token:
LOG.critical("GrafanaHelper authentication token not configured")
return
self._headers = {"Authorization": "Bearer " + token,
"Content-Type": "Application/json"}
if not base_url:
LOG.critical("GrafanaHelper url not properly configured, "
"check base_url")
return
self._base_url = base_url
# Very basic url parsing
parse = urlparse.urlparse(self._base_url)
if parse.scheme is '' or parse.netloc is '' or parse.path is '':
LOG.critical("GrafanaHelper url not properly configured, "
"check base_url and project_id")
return
self._build_metric_map()
if len(self.METRIC_MAP) == 0:
LOG.critical("GrafanaHelper not configured for any metrics")
self.configured = True
def _build_metric_map(self):
"""Builds the metric map by reading config information"""
for key, value in CONF.grafana_client.database_map.items():
try:
project = CONF.grafana_client.project_id_map[key]
attribute = CONF.grafana_client.attribute_map[key]
translator = CONF.grafana_client.translator_map[key]
query = CONF.grafana_client.query_map[key]
if project is not None and \
value is not None and\
translator in self.TRANSLATOR_LIST and\
query is not None:
self.METRIC_MAP[key] = {
'db': value,
'project': project,
'attribute': attribute,
'translator': translator,
'query': query
}
except KeyError as e:
LOG.error(e)
def _build_translator_schema(self, metric, db, attribute, query, resource,
resource_type, period, aggregate,
granularity):
"""Create dictionary to pass to grafana proxy translators"""
return {'metric': metric, 'db': db, 'attribute': attribute,
'query': query, 'resource': resource,
'resource_type': resource_type, 'period': period,
'aggregate': aggregate, 'granularity': granularity}
def _get_translator(self, name, data):
"""Use the names of translators to get the translator for the metric"""
if name == influxdb.InfluxDBGrafanaTranslator.NAME:
return influxdb.InfluxDBGrafanaTranslator(data)
else:
raise exception.InvalidParameter(
parameter='name', parameter_type='grafana translator')
def _request(self, params, project_id):
"""Make the request to the endpoint to retrieve data
If the request fails, determines what error to raise.
"""
if self.configured is False:
raise exception.DataSourceNotAvailable(self.NAME)
resp = requests.get(self._base_url + str(project_id) + '/query',
params=params, headers=self._headers)
if resp.status_code == 200:
return resp
elif resp.status_code == 400:
LOG.error("Query for metric is invalid")
elif resp.status_code == 401:
LOG.error("Authorization token is invalid")
raise exception.DataSourceNotAvailable(self.NAME)
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
"""Get the value for the specific metric based on specified parameters
"""
try:
self.METRIC_MAP[meter_name]
except KeyError:
LOG.error("Metric: {0} does not appear in the current Grafana "
"metric map".format(meter_name))
raise exception.MetricNotAvailable(metric=meter_name)
db = self.METRIC_MAP[meter_name]['db']
project = self.METRIC_MAP[meter_name]['project']
attribute = self.METRIC_MAP[meter_name]['attribute']
translator_name = self.METRIC_MAP[meter_name]['translator']
query = self.METRIC_MAP[meter_name]['query']
data = self._build_translator_schema(
meter_name, db, attribute, query, resource, resource_type, period,
aggregate, granularity)
translator = self._get_translator(translator_name, data)
params = translator.build_params()
raw_kwargs = dict(
params=params,
project_id=project,
)
kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
resp = self.query_retry(self._request, **kwargs)
result = translator.extract_result(resp.content)
return result
def get_host_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_cpu_usage', period, aggregate,
granularity)
def get_host_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_ram_usage', period, aggregate,
granularity)
def get_host_outlet_temp(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_outlet_temp', period, aggregate,
granularity)
def get_host_inlet_temp(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_inlet_temp', period, aggregate,
granularity)
def get_host_airflow(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_airflow', period, aggregate,
granularity)
def get_host_power(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_power', period, aggregate,
granularity)
def get_instance_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_cpu_usage', period, aggregate,
granularity)
def get_instance_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_usage', period, aggregate,
granularity)
def get_instance_ram_allocated(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_ram_allocated', period, aggregate,
granularity)
def get_instance_l3_cache_usage(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_l3_cache_usage', period, aggregate,
granularity)
def get_instance_root_disk_size(self, resource, period=300,
aggregate="mean", granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_root_disk_size', period, aggregate,
granularity)

View File

@@ -0,0 +1,125 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from watcher._i18n import _
from watcher.common import exception
from watcher.decision_engine.datasources import base
class BaseGrafanaTranslator(object):
"""Grafana translator baseclass to use with grafana for different databases
Specific databasses that are proxied through grafana require some
alterations depending on the database.
"""
"""
data {
metric: name of the metric as found in DataSourceBase.METRIC_MAP,
db: database specified for this metric in grafana_client config
options,
attribute: the piece of information that will be selected from the
resource object to build the query.
query: the unformatted query from the configuration for this metric,
resource: the object from the OpenStackClient
resource_type: the type of the resource
['compute_node','instance', 'bare_metal', 'storage'],
period: the period of time to collect metrics for in seconds,
aggregate: the aggregation can be any from ['mean', 'max', 'min',
'count'],
granularity: interval between datapoints in seconds (optional),
}
"""
"""Every grafana translator should have a uniquely identifying name"""
NAME = ''
RESOURCE_TYPES = base.DataSourceBase.RESOURCE_TYPES
AGGREGATES = base.DataSourceBase.AGGREGATES
def __init__(self, data):
self._data = data
self._validate_data()
def _validate_data(self):
"""iterate through the supplied data and verify attributes"""
optionals = ['granularity']
reference_data = {
'metric': None,
'db': None,
'attribute': None,
'query': None,
'resource': None,
'resource_type': None,
'period': None,
'aggregate': None,
'granularity': None
}
reference_data.update(self._data)
for key, value in reference_data.items():
if value is None and key not in optionals:
raise exception.InvalidParameter(
message=(_("The value %(value)s for parameter "
"%(parameter)s is invalid") % {'value': None,
'parameter': key}
)
)
if reference_data['resource_type'] not in self.RESOURCE_TYPES:
raise exception.InvalidParameter(parameter='resource_type',
parameter_type='RESOURCE_TYPES')
if reference_data['aggregate'] not in self.AGGREGATES:
raise exception.InvalidParameter(parameter='aggregate',
parameter_type='AGGREGATES')
@staticmethod
def _extract_attribute(resource, attribute):
"""Retrieve the desired attribute from the resource
:param resource: The resource object to extract the attribute from.
:param attribute: The name of the attribute to subtract as string.
:return: The extracted attribute or None
"""
try:
return getattr(resource, attribute)
except AttributeError:
raise
@staticmethod
def _query_format(query, aggregate, resource, period,
granularity, translator_specific):
return query.format(aggregate, resource, period, granularity,
translator_specific)
@abc.abstractmethod
def build_params(self):
"""Build the set of parameters to send with the request"""
raise NotImplementedError()
@abc.abstractmethod
def extract_result(self, raw_results):
"""Extrapolate the metric from the raw results of the request"""
raise NotImplementedError()

View File

@@ -0,0 +1,88 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2019 European Organization for Nuclear Research (CERN)
#
# Authors: Corne Lukken <info@dantalion.nl>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
from watcher.common import exception
from watcher.decision_engine.datasources.grafana_translator.base import \
BaseGrafanaTranslator
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class InfluxDBGrafanaTranslator(BaseGrafanaTranslator):
"""Grafana translator to communicate with InfluxDB database"""
NAME = 'influxdb'
def __init__(self, data):
super(InfluxDBGrafanaTranslator, self).__init__(data)
def build_params(self):
""""""
data = self._data
retention_period = None
available_periods = CONF.grafana_translators.retention_periods.items()
for key, value in sorted(available_periods, key=lambda x: x[1]):
if int(data['period']) < int(value):
retention_period = key
break
if retention_period is None:
retention_period = max(available_periods)[0]
LOG.warning("Longest retention period is to short for desired"
" period")
try:
resource = self._extract_attribute(
data['resource'], data['attribute'])
except AttributeError:
LOG.error("Resource: {0} does not contain attribute {1}".format(
data['resource'], data['attribute']))
raise
# Granularity is optional if it is None the minimal value for InfluxDB
# will be 1
granularity = \
data['granularity'] if data['granularity'] is not None else 1
return {'db': data['db'],
'epoch': 'ms',
'q': self._query_format(
data['query'], data['aggregate'], resource, data['period'],
granularity, retention_period)}
def extract_result(self, raw_results):
""""""
try:
# For result structure see:
# https://docs.openstack.org/watcher/latest/datasources/grafana.html#InfluxDB
result = jsonutils.loads(raw_results)
result = result['results'][0]['series'][0]
index_aggregate = result['columns'].index(self._data['aggregate'])
return result['values'][0][index_aggregate]
except KeyError:
LOG.error("Could not extract {0} for the resource: {1}".format(
self._data['metric'], self._data['resource']))
raise exception.NoSuchMetricForHost(
metric=self._data['metric'], host=self._data['resource'])

View File

@@ -0,0 +1,156 @@
# -*- encoding: utf-8 -*-
# Copyright 2017 NEC Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import yaml
from collections import OrderedDict
from oslo_config import cfg
from oslo_log import log
from watcher.common import exception
from watcher.decision_engine.datasources import ceilometer as ceil
from watcher.decision_engine.datasources import gnocchi as gnoc
from watcher.decision_engine.datasources import grafana as graf
from watcher.decision_engine.datasources import monasca as mon
LOG = log.getLogger(__name__)
class DataSourceManager(object):
metric_map = OrderedDict([
(gnoc.GnocchiHelper.NAME, gnoc.GnocchiHelper.METRIC_MAP),
(ceil.CeilometerHelper.NAME, ceil.CeilometerHelper.METRIC_MAP),
(mon.MonascaHelper.NAME, mon.MonascaHelper.METRIC_MAP),
(graf.GrafanaHelper.NAME, graf.GrafanaHelper.METRIC_MAP),
])
"""Dictionary with all possible datasources, dictionary order is the default
order for attempting to use datasources
"""
def __init__(self, config=None, osc=None):
self.osc = osc
self.config = config
self._ceilometer = None
self._monasca = None
self._gnocchi = None
self._grafana = None
# Dynamically update grafana metric map, only available at runtime
# The metric map can still be overridden by a yaml config file
self.metric_map[graf.GrafanaHelper.NAME] = self.grafana.METRIC_MAP
metric_map_path = cfg.CONF.watcher_decision_engine.metric_map_path
metrics_from_file = self.load_metric_map(metric_map_path)
for ds, mp in self.metric_map.items():
try:
self.metric_map[ds].update(metrics_from_file.get(ds, {}))
except KeyError:
msgargs = (ds, self.metric_map.keys())
LOG.warning('Invalid Datasource: %s. Allowed: %s ', *msgargs)
self.datasources = self.config.datasources
@property
def ceilometer(self):
if self._ceilometer is None:
self.ceilometer = ceil.CeilometerHelper(osc=self.osc)
return self._ceilometer
@ceilometer.setter
def ceilometer(self, ceilometer):
self._ceilometer = ceilometer
@property
def monasca(self):
if self._monasca is None:
self._monasca = mon.MonascaHelper(osc=self.osc)
return self._monasca
@monasca.setter
def monasca(self, monasca):
self._monasca = monasca
@property
def gnocchi(self):
if self._gnocchi is None:
self._gnocchi = gnoc.GnocchiHelper(osc=self.osc)
return self._gnocchi
@gnocchi.setter
def gnocchi(self, gnocchi):
self._gnocchi = gnocchi
@property
def grafana(self):
if self._grafana is None:
self._grafana = graf.GrafanaHelper(osc=self.osc)
return self._grafana
@grafana.setter
def grafana(self, grafana):
self._grafana = grafana
def get_backend(self, metrics):
"""Determine the datasource to use from the configuration
Iterates over the configured datasources in order to find the first
which can support all specified metrics. Upon a missing metric the next
datasource is attempted.
"""
if not self.datasources or len(self.datasources) is 0:
raise exception.NoDatasourceAvailable
if not metrics or len(metrics) is 0:
LOG.critical("Can not retrieve datasource without specifying "
"list of required metrics.")
raise exception.InvalidParameter(parameter='metrics',
parameter_type='none empty list')
for datasource in self.datasources:
no_metric = False
for metric in metrics:
if (metric not in self.metric_map[datasource] or
self.metric_map[datasource].get(metric) is None):
no_metric = True
LOG.warning("Datasource: {0} could not be used due to "
"metric: {1}".format(datasource, metric))
break
if not no_metric:
# Try to use a specific datasource but attempt additional
# datasources upon exceptions (if config has more datasources)
try:
ds = getattr(self, datasource)
ds.METRIC_MAP.update(self.metric_map[ds.NAME])
return ds
except Exception:
pass
raise exception.MetricNotAvailable(metric=metric)
def load_metric_map(self, file_path):
"""Load metrics from the metric_map_path"""
if file_path and os.path.exists(file_path):
with open(file_path, 'r') as f:
try:
ret = yaml.safe_load(f.read())
# return {} if the file is empty
return ret if ret else {}
except yaml.YAMLError as e:
LOG.warning('Could not load %s: %s', file_path, e)
return {}
else:
return {}

View File

@@ -0,0 +1,171 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Authors: Vincent FRANCOISE <vincent.francoise@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from monascaclient import exc
from watcher.common import clients
from watcher.common import exception
from watcher.decision_engine.datasources import base
class MonascaHelper(base.DataSourceBase):
NAME = 'monasca'
METRIC_MAP = dict(host_cpu_usage='cpu.percent',
host_ram_usage=None,
host_outlet_temp=None,
host_inlet_temp=None,
host_airflow=None,
host_power=None,
instance_cpu_usage='vm.cpu.utilization_perc',
instance_ram_usage=None,
instance_ram_allocated=None,
instance_l3_cache_usage=None,
instance_root_disk_size=None,
)
def __init__(self, osc=None):
""":param osc: an OpenStackClients instance"""
self.osc = osc if osc else clients.OpenStackClients()
self.monasca = self.osc.monasca()
def _format_time_params(self, start_time, end_time, period):
"""Format time-related params to the correct Monasca format
:param start_time: Start datetime from which metrics will be used
:param end_time: End datetime from which metrics will be used
:param period: interval in seconds (int)
:return: start ISO time, end ISO time, period
"""
if not period:
period = int(datetime.timedelta(hours=3).total_seconds())
if not start_time:
start_time = (
datetime.datetime.utcnow() -
datetime.timedelta(seconds=period))
start_timestamp = None if not start_time else start_time.isoformat()
end_timestamp = None if not end_time else end_time.isoformat()
return start_timestamp, end_timestamp, period
def query_retry_reset(self, exception_instance):
if isinstance(exception_instance, exc.Unauthorized):
self.osc.reset_clients()
self.monasca = self.osc.monasca()
def check_availability(self):
try:
self.query_retry(self.monasca.metrics.list)
except Exception:
return 'not available'
return 'available'
def list_metrics(self):
# TODO(alexchadin): this method should be implemented in accordance to
# monasca API.
pass
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(seconds=(int(period)))
meter = self.METRIC_MAP.get(meter_name)
if meter is None:
raise exception.MetricNotAvailable(metric=meter_name)
if aggregate == 'mean':
aggregate = 'avg'
raw_kwargs = dict(
name=meter,
start_time=start_time.isoformat(),
end_time=stop_time.isoformat(),
dimensions={'hostname': resource.uuid},
period=period,
statistics=aggregate,
group_by='*',
)
kwargs = {k: v for k, v in raw_kwargs.items() if k and v}
statistics = self.query_retry(
f=self.monasca.metrics.list_statistics, **kwargs)
cpu_usage = None
for stat in statistics:
avg_col_idx = stat['columns'].index(aggregate)
values = [r[avg_col_idx] for r in stat['statistics']]
value = float(sum(values)) / len(values)
cpu_usage = value
return cpu_usage
def get_host_cpu_usage(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'compute_node', 'host_cpu_usage', period, aggregate,
granularity)
def get_host_ram_usage(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_host_outlet_temp(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_host_inlet_temp(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_host_airflow(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_host_power(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_instance_cpu_usage(self, resource, period,
aggregate, granularity=None):
return self.statistic_aggregation(
resource, 'instance', 'instance_cpu_usage', period, aggregate,
granularity)
def get_instance_ram_usage(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_instance_ram_allocated(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_instance_l3_cache_usage(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError
def get_instance_root_disk_size(self, resource, period,
aggregate, granularity=None):
raise NotImplementedError