diff --git a/doc/source/deploy/configuration.rst b/doc/source/deploy/configuration.rst index 19d016ef5..e14639f5e 100644 --- a/doc/source/deploy/configuration.rst +++ b/doc/source/deploy/configuration.rst @@ -280,3 +280,21 @@ Configure Nova compute Please check your hypervisor configuration to correctly handle `instance migration`_. .. _`instance migration`: http://docs.openstack.org/admin-guide-cloud/compute-configuring-migrations.html + +Configure Ceilometer +==================== + +The default strategy 'basic_consolidation' provided by watcher requires +Ceilometer to collect the "compute.node.cpu.*." and "cpu_util" measurements + +#. Add/Update the following lines into the /etc/nova/nova.conf configuration file: + + + .. code-block:: bash + + $ compute_available_monitors=nova.compute.monitors.all_monitors + $ compute_monitors=ComputeDriverCPUMonitor + +#. Restart the Nova compute service and the Nova scheduler after completing the above configuration. + +#. For more information: `Integrating your metrics plug-in with Nova `_ diff --git a/etc/watcher/watcher.conf.sample b/etc/watcher/watcher.conf.sample index 6f6cd5a03..7e81dd0e9 100644 --- a/etc/watcher/watcher.conf.sample +++ b/etc/watcher/watcher.conf.sample @@ -4,7 +4,8 @@ # From watcher # -# Log output to standard error. (boolean value) +# Log output to standard error. This option is ignored if +# log_config_append is set. (boolean value) #use_stderr = true # Format string to use for log messages with context. (string value) @@ -19,17 +20,15 @@ # Prefix each line of exception output with this format. (string # value) -#logging_exception_prefix = %(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s +#logging_exception_prefix = %(asctime)s.%(msecs)03d %(process)d ERROR %(name)s %(instance)s -# List of logger=LEVEL pairs. (list value) -#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN +# List of logger=LEVEL pairs. This option is ignored if +# log_config_append is set. (list value) +#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN # Enables or disables publication of error events. (boolean value) #publish_errors = false -# Enables or disables fatal status of deprecations. (boolean value) -#fatal_deprecations = false - # The format for an instance that is passed with the log message. # (string value) #instance_format = "[instance: %(uuid)s] " @@ -38,52 +37,75 @@ # (string value) #instance_uuid_format = "[instance: %(uuid)s] " +# Format string for user_identity field of the +# logging_context_format_string (string value) +#logging_user_identity_format = %(user)s %(tenant)s %(domain)s %(user_domain)s %(project_domain)s + # Print debugging output (set logging level to DEBUG instead of -# default WARNING level). (boolean value) +# default INFO level). (boolean value) #debug = false -# Print more verbose output (set logging level to INFO instead of -# default WARNING level). (boolean value) -#verbose = false +# If set to false, will disable INFO logging level, making WARNING the +# default. (boolean value) +# This option is deprecated for removal. +# Its value may be silently ignored in the future. +#verbose = true # The name of a logging configuration file. This file is appended to # any existing logging configuration files. For details about logging # configuration files, see the Python logging module documentation. -# (string value) +# Note that when logging configuration files are used then all logging +# configuration is set in the configuration file and other logging +# configuration options are ignored (for example, log_format). (string +# value) # Deprecated group/name - [DEFAULT]/log_config #log_config_append = # DEPRECATED. A logging.Formatter log message format string which may # use any of the available logging.LogRecord attributes. This option # is deprecated. Please use logging_context_format_string and -# logging_default_format_string instead. (string value) +# logging_default_format_string instead. This option is ignored if +# log_config_append is set. (string value) #log_format = # Format string for %%(asctime)s in log records. Default: %(default)s -# . (string value) +# . This option is ignored if log_config_append is set. (string value) #log_date_format = %Y-%m-%d %H:%M:%S # (Optional) Name of log file to output to. If no default is set, -# logging will go to stdout. (string value) +# logging will go to stdout. This option is ignored if +# log_config_append is set. (string value) # Deprecated group/name - [DEFAULT]/logfile #log_file = # (Optional) The base directory used for relative --log-file paths. -# (string value) +# This option is ignored if log_config_append is set. (string value) # Deprecated group/name - [DEFAULT]/logdir #log_dir = -# Use syslog for logging. Existing syslog format is DEPRECATED during -# I, and will change in J to honor RFC5424. (boolean value) +# (Optional) Uses logging handler designed to watch file system. When +# log file is moved or removed this handler will open a new log file +# with specified path instantaneously. It makes sense only if log-file +# option is specified and Linux platform is used. This option is +# ignored if log_config_append is set. (boolean value) +#watch_log_file = false + +# Use syslog for logging. Existing syslog format is DEPRECATED and +# will be changed later to honor RFC5424. This option is ignored if +# log_config_append is set. (boolean value) #use_syslog = false # (Optional) Enables or disables syslog rfc5424 format for logging. If # enabled, prefixes the MSG part of the syslog message with APP-NAME -# (RFC5424). The format without the APP-NAME is deprecated in I, and -# will be removed in J. (boolean value) -#use_syslog_rfc_format = false +# (RFC5424). The format without the APP-NAME is deprecated in Kilo, +# and will be removed in Mitaka, along with this option. This option +# is ignored if log_config_append is set. (boolean value) +# This option is deprecated for removal. +# Its value may be silently ignored in the future. +#use_syslog_rfc_format = true -# Syslog facility to receive log lines. (string value) +# Syslog facility to receive log lines. This option is ignored if +# log_config_append is set. (string value) #syslog_log_facility = LOG_USER @@ -433,45 +455,6 @@ #goals = BALANCE_LOAD:basic,MINIMIZE_ENERGY_CONSUMPTION:basic,MINIMIZE_LICENSING_COST:basic,PREPARE_PLANNED_OPERATION:basic,SERVERS_CONSOLIDATION:basic -[watcher_influxdb_collector] - -# -# From watcher -# - -# The hostname to connect to InfluxDB (string value) -#hostname = localhost - -# port to connect to InfluxDB, defaults to 8086 (integer value) -#port = 8086 - -# user to connect, defaults to root (string value) -#username = root - -# password of the user, defaults to root (string value) -#password = root - -# database name to connect to (string value) -#database = indeed - -# use https instead of http to connect to InfluxDB (boolean value) -#param ssl = false - -# number of seconds Requestswill wait for your client to establish a -# connection (integer value) -#timeout = 5 - -# number of seconds Requestswill wait for your client to establish a -# connection (integer value) -#timeout = 5 - -# use UDP to connect to InfluxDB (boolean value) -#use_udp = false - -# UDP port to connect to InfluxDB (integer value) -#udp_port = 4444 - - [watcher_messaging] # @@ -483,7 +466,7 @@ # The name of a message executor, forexample: eventlet, blocking # (string value) -#executor = eventlet +#executor = blocking # The protocol used by the message broker, for example rabbit (string # value) @@ -505,17 +488,6 @@ #virtual_host = -[watcher_metrics_collector] - -# -# From watcher -# - -# The driver that collect measurementsof the utilizationof the -# physical and virtual resources (string value) -#metrics_resource = influxdb - - [watcher_strategies] # diff --git a/requirements.txt b/requirements.txt index 7eb973696..d173bdbfb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,5 +28,4 @@ python-cinderclient==1.4.0 # Collector python-ceilometerclient==1.5.0 -influxdb==2.9.1 parsedatetime==1.5 diff --git a/setup.cfg b/setup.cfg index 855cb0a3d..ae3f96107 100644 --- a/setup.cfg +++ b/setup.cfg @@ -46,8 +46,6 @@ watcher.database.migration_backend = watcher_strategies = basic = watcher.decision_engine.strategies.basic_consolidation:BasicConsolidation -watcher_metrics_collector = - influxdb = watcher.metrics_engine.framework.datasources.influxdb_collector:InfluxDBCollector [build_sphinx] source-dir = doc/source diff --git a/watcher/applier/api/applier.py b/watcher/applier/api/applier.py index 381e96758..34d2a6270 100644 --- a/watcher/applier/api/applier.py +++ b/watcher/applier/api/applier.py @@ -17,7 +17,13 @@ # limitations under the License. # +import abc +import six + +@six.add_metaclass(abc.ABCMeta) class Applier(object): + @abc.abstractmethod def execute(self, action_plan_uuid): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/applier/api/command_mapper.py b/watcher/applier/api/command_mapper.py index c536f2e7c..4b6a21bb3 100644 --- a/watcher/applier/api/command_mapper.py +++ b/watcher/applier/api/command_mapper.py @@ -17,7 +17,13 @@ # limitations under the License. # +import abc +import six + +@six.add_metaclass(abc.ABCMeta) class CommandMapper(object): + @abc.abstractmethod def build_primitive_command(self, action): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/applier/api/messaging/applier_command.py b/watcher/applier/api/messaging/applier_command.py index 93288d249..880a0251f 100644 --- a/watcher/applier/api/messaging/applier_command.py +++ b/watcher/applier/api/messaging/applier_command.py @@ -17,7 +17,13 @@ # limitations under the License. # +import abc +import six + +@six.add_metaclass(abc.ABCMeta) class ApplierCommand(object): + @abc.abstractmethod def execute(self): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/applier/api/primitive_command.py b/watcher/applier/api/primitive_command.py index d8bdf91a3..48f023fa8 100644 --- a/watcher/applier/api/primitive_command.py +++ b/watcher/applier/api/primitive_command.py @@ -16,14 +16,21 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six from watcher.applier.api.promise import Promise +@six.add_metaclass(abc.ABCMeta) class PrimitiveCommand(object): @Promise + @abc.abstractmethod def execute(self): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover @Promise + @abc.abstractmethod def undo(self): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/applier/framework/command/hypervisor_state_command.py b/watcher/applier/framework/command/hypervisor_state_command.py index 726eb4187..3cddd3791 100644 --- a/watcher/applier/framework/command/hypervisor_state_command.py +++ b/watcher/applier/framework/command/hypervisor_state_command.py @@ -18,13 +18,12 @@ # -from keystoneclient.auth.identity import v3 -from keystoneclient import session from oslo_config import cfg from watcher.applier.api.primitive_command import PrimitiveCommand from watcher.applier.api.promise import Promise from watcher.applier.framework.command.wrapper.nova_wrapper import NovaWrapper +from watcher.common.keystone import Client from watcher.decision_engine.framework.model.hypervisor_state import \ HypervisorState @@ -37,25 +36,9 @@ class HypervisorStateCommand(PrimitiveCommand): self.status = status def nova_manage_service(self, status): - creds = \ - {'auth_url': CONF.keystone_authtoken.auth_uri, - 'username': CONF.keystone_authtoken.admin_user, - 'password': CONF.keystone_authtoken.admin_password, - 'project_name': CONF.keystone_authtoken.admin_tenant_name, - 'user_domain_name': "default", - 'project_domain_name': "default"} - - auth = v3.Password(auth_url=creds['auth_url'], - username=creds['username'], - password=creds['password'], - project_name=creds['project_name'], - user_domain_name=creds[ - 'user_domain_name'], - project_domain_name=creds[ - 'project_domain_name']) - sess = session.Session(auth=auth) - # todo(jed) refactoring - wrapper = NovaWrapper(creds, session=sess) + keystone = Client() + wrapper = NovaWrapper(keystone.get_credentials(), + session=keystone.get_session()) if status is True: return wrapper.enable_service_nova_compute(self.host) else: diff --git a/watcher/applier/framework/command/migrate_command.py b/watcher/applier/framework/command/migrate_command.py index 1fdb04cae..c26906403 100644 --- a/watcher/applier/framework/command/migrate_command.py +++ b/watcher/applier/framework/command/migrate_command.py @@ -24,6 +24,7 @@ from oslo_config import cfg from watcher.applier.api.primitive_command import PrimitiveCommand from watcher.applier.api.promise import Promise from watcher.applier.framework.command.wrapper.nova_wrapper import NovaWrapper +from watcher.common.keystone import Client from watcher.decision_engine.framework.default_planner import Primitives CONF = cfg.CONF @@ -40,25 +41,9 @@ class MigrateCommand(PrimitiveCommand): self.destination_hypervisor = destination_hypervisor def migrate(self, destination): - - creds = \ - {'auth_url': CONF.keystone_authtoken.auth_uri, - 'username': CONF.keystone_authtoken.admin_user, - 'password': CONF.keystone_authtoken.admin_password, - 'project_name': CONF.keystone_authtoken.admin_tenant_name, - 'user_domain_name': "default", - 'project_domain_name': "default"} - auth = v3.Password(auth_url=creds['auth_url'], - username=creds['username'], - password=creds['password'], - project_name=creds['project_name'], - user_domain_name=creds[ - 'user_domain_name'], - project_domain_name=creds[ - 'project_domain_name']) - sess = session.Session(auth=auth) - # todo(jed) add class - wrapper = NovaWrapper(creds, session=sess) + keystone = Client() + wrapper = NovaWrapper(keystone.get_credentials(), + session=keystone.get_session()) instance = wrapper.find_instance(self.instance_uuid) if instance: project_id = getattr(instance, "tenant_id") diff --git a/watcher/applier/framework/command/nop_command.py b/watcher/applier/framework/command/nop_command.py index 4f3d9ee98..d781b4baf 100644 --- a/watcher/applier/framework/command/nop_command.py +++ b/watcher/applier/framework/command/nop_command.py @@ -17,18 +17,25 @@ # limitations under the License. # +from oslo_log import log + from watcher.applier.api.primitive_command import PrimitiveCommand from watcher.applier.api.promise import Promise +LOG = log.getLogger(__name__) + + class NopCommand(PrimitiveCommand): def __init__(self): pass @Promise def execute(self): + LOG.debug("executing NOP command") return True @Promise def undo(self): + LOG.debug("undo NOP command") return True diff --git a/watcher/applier/framework/command/wrapper/nova_wrapper.py b/watcher/applier/framework/command/wrapper/nova_wrapper.py index e0bb40d5f..5a6c9c209 100644 --- a/watcher/applier/framework/command/wrapper/nova_wrapper.py +++ b/watcher/applier/framework/command/wrapper/nova_wrapper.py @@ -355,10 +355,10 @@ class NovaWrapper(object): and retry: instance = self.nova.servers.get(instance.id) LOG.debug( - "Waiting the migration of " + str( - instance.human_id) + " to " + - getattr(instance, - 'OS-EXT-SRV-ATTR:host')) + 'Waiting the migration of {0} to {1}'.format( + instance, + getattr(instance, + 'OS-EXT-SRV-ATTR:host'))) time.sleep(1) retry -= 1 diff --git a/watcher/applier/framework/command_executor.py b/watcher/applier/framework/command_executor.py index b313a75b1..ce312151c 100644 --- a/watcher/applier/framework/command_executor.py +++ b/watcher/applier/framework/command_executor.py @@ -66,9 +66,12 @@ class CommandExecutor(object): self.deploy.populate(primitive) self.notify(action, Status.SUCCESS) except Exception as e: - LOG.error( - "The applier module failed to execute the action" + str( - action) + " with the exception : " + unicode(e)) + LOG.debug( + 'The applier module failed to execute the action{0} with ' + 'the exception {1} '.format( + action, + unicode(e))) + LOG.error("Trigger a rollback") self.notify(action, Status.FAILED) self.deploy.rollback() diff --git a/watcher/cmd/decisionengine.py b/watcher/cmd/decisionengine.py index dd000d205..83240d99a 100644 --- a/watcher/cmd/decisionengine.py +++ b/watcher/cmd/decisionengine.py @@ -23,11 +23,11 @@ import sys from oslo_config import cfg from oslo_log import log as logging +from watcher.decision_engine.framework.manager import DecisionEngineManager -from watcher.decision_engine.framework.manager_decision_engine import \ - DecisionEngineManager from watcher import i18n + LOG = logging.getLogger(__name__) CONF = cfg.CONF _LI = i18n._LI diff --git a/watcher/common/ceilometer.py b/watcher/common/ceilometer.py new file mode 100644 index 000000000..bdb15dd06 --- /dev/null +++ b/watcher/common/ceilometer.py @@ -0,0 +1,173 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Authors: Jean-Emile DARTOIS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from urlparse import urlparse + +from ceilometerclient import client +from ceilometerclient.exc import HTTPUnauthorized +from watcher.common import keystone + + +class Client(object): + # todo olso conf: this must be sync with ceilometer + CEILOMETER_API_VERSION = '2' + + def __init__(self): + ksclient = keystone.Client() + self.creds = ksclient.get_credentials() + self.creds['os_auth_token'] = ksclient.get_token() + self.creds['token'] = ksclient.get_token() + self.creds['ceilometer_url'] = "http://" + urlparse( + ksclient.get_endpoint( + service_type='metering', + endpoint_type='publicURL')).netloc + self.connect() + + def connect(self): + """Initialization of Ceilometer client.""" + self.cmclient = client.get_client(self.CEILOMETER_API_VERSION, + **self.creds) + + def build_query(user_id=None, tenant_id=None, resource_id=None, + user_ids=None, tenant_ids=None, resource_ids=None): + """Returns query built from given parameters. + This query can be then used for querying resources, meters and + statistics. + :Parameters: + - `user_id`: user_id, has a priority over list of ids + - `tenant_id`: tenant_id, has a priority over list of ids + - `resource_id`: resource_id, has a priority over list of ids + - `user_ids`: list of user_ids + - `tenant_ids`: list of tenant_ids + - `resource_ids`: list of resource_ids + """ + user_ids = user_ids or [] + tenant_ids = tenant_ids or [] + resource_ids = resource_ids or [] + + query = [] + if user_id: + user_ids = [user_id] + for u_id in user_ids: + query.append({"field": "user_id", "op": "eq", "value": u_id}) + + if tenant_id: + tenant_ids = [tenant_id] + for t_id in tenant_ids: + query.append({"field": "project_id", "op": "eq", "value": t_id}) + + if resource_id: + resource_ids = [resource_id] + for r_id in resource_ids: + query.append({"field": "resource_id", "op": "eq", "value": r_id}) + + return query + + def query_sample(self, meter_name, query, limit=1): + try: + samples = self.ceilometerclient().samples.list( + meter_name=meter_name, + limit=limit, + q=query) + except HTTPUnauthorized: + self.connect() + samples = self.ceilometerclient().samples.list( + meter_name=meter_name, + limit=limit, + q=query) + except Exception: + raise + return samples + + def get_endpoint(self, service_type, endpoint_type=None): + ksclient = keystone.Client() + endpoint = ksclient.get_endpoint(service_type=service_type, + endpoint_type=endpoint_type) + return endpoint + + def statistic_list(self, meter_name, query=None, period=None): + """List of statistics.""" + statistics = self.ceilometerclient().statistics.list( + meter_name=meter_name, q=query, period=period) + return statistics + + def meter_list(self, query=None): + """List the user's meters.""" + meters = self.ceilometerclient().meters.list(query) + return meters + + def statistic_aggregation(self, + resource_id, + meter_name, + period, + aggregate='avg'): + """ + :param resource_id: id + :param meter_name: meter names of which we want the statistics + :param period: `period`: In seconds. If no period is given, only one + aggregate statistic is returned. If given, a faceted + result will be returned, divided into given periods. + Periods with no data are ignored. + :param aggregate: + :return: + """ + """Representing a statistic aggregate by operators""" + + query = self.build_query(resource_id=resource_id) + try: + statistic = self.cmclient.statistics.list( + meter_name=meter_name, + q=query, + period=period, + aggregates=[ + {'func': aggregate}], + groupby=['resource_id']) + except HTTPUnauthorized: + self.connect() + statistic = self.cmclient.statistics.list( + meter_name=meter_name, + q=query, + period=period, + aggregates=[ + {'func': aggregate}], + groupby=['resource_id']) + except Exception: + raise + item_value = None + if statistic: + item_value = statistic[-1]._info.get('aggregate').get('avg') + return item_value + + def get_last_sample_values(self, resource_id, meter_name, limit=1): + samples = self.query_sample(meter_name=meter_name, + query=self.build_query(resource_id)) + values = [] + for index, sample in enumerate(samples): + values.append( + {'sample_%s' % index: {'timestamp': sample._info['timestamp'], + 'value': sample._info[ + 'counter_volume']}}) + return values + + def get_last_sample_value(self, resource_id, meter_name): + samples = self.query_sample(meter_name=meter_name, + query=self.build_query(resource_id)) + if samples: + return samples[-1]._info['counter_volume'] + else: + return False diff --git a/watcher/metrics_engine/framework/collector_manager.py b/watcher/common/keystone.py similarity index 50% rename from watcher/metrics_engine/framework/collector_manager.py rename to watcher/common/keystone.py index 73ae94aea..69b7ad84a 100644 --- a/watcher/metrics_engine/framework/collector_manager.py +++ b/watcher/common/keystone.py @@ -16,32 +16,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime + from keystoneclient.auth.identity import v3 from keystoneclient import session - +import keystoneclient.v3.client as ksclient from oslo_config import cfg from oslo_log import log -from stevedore import driver -from watcher.applier.framework.command.wrapper.nova_wrapper import NovaWrapper - -from watcher.metrics_engine.framework.statedb_collector import NovaCollector LOG = log.getLogger(__name__) CONF = cfg.CONF -WATCHER_METRICS_COLLECTOR_OPTS = [ - cfg.StrOpt('metrics_resource', - default="influxdb", - help='The driver that collect measurements' - 'of the utilization' - 'of the physical and virtual resources') -] -metrics_collector_opt_group = cfg.OptGroup( - name='watcher_collector', - title='Defines Metrics collector available') -CONF.register_group(metrics_collector_opt_group) -CONF.register_opts(WATCHER_METRICS_COLLECTOR_OPTS, metrics_collector_opt_group) - CONF.import_opt('admin_user', 'keystonemiddleware.auth_token', group='keystone_authtoken') CONF.import_opt('admin_tenant_name', 'keystonemiddleware.auth_token', @@ -52,16 +37,28 @@ CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token', group='keystone_authtoken') -class CollectorManager(object): - def get_metric_collector(self): - manager = driver.DriverManager( - namespace='watcher_metrics_collector', - name=CONF.watcher_collector.metrics_resource, - invoke_on_load=True, - ) - return manager.driver +class Client(object): + def __init__(self): + ks_args = self.get_credentials() + self.ks_client = ksclient.Client(**ks_args) - def get_statedb_collector(self): + def get_endpoint(self, **kwargs): + attr = None + filter_value = None + if kwargs.get('region_name'): + attr = 'region' + filter_value = kwargs.get('region_name') + return self.ks_client.service_catalog.url_for( + service_type=kwargs.get('service_type') or 'metering', + attr=attr, + filter_value=filter_value, + endpoint_type=kwargs.get('endpoint_type') or 'publicURL') + + def get_token(self): + return self.ks_client.auth_token + + @staticmethod + def get_credentials(): creds = \ {'auth_url': CONF.keystone_authtoken.auth_uri, 'username': CONF.keystone_authtoken.admin_user, @@ -69,15 +66,15 @@ class CollectorManager(object): 'project_name': CONF.keystone_authtoken.admin_tenant_name, 'user_domain_name': "default", 'project_domain_name': "default"} + LOG.debug(creds) + return creds - auth = v3.Password(auth_url=creds['auth_url'], - username=creds['username'], - password=creds['password'], - project_name=creds['project_name'], - user_domain_name=creds[ - 'user_domain_name'], - project_domain_name=creds[ - 'project_domain_name']) - sess = session.Session(auth=auth) - wrapper = NovaWrapper(creds, session=sess) - return NovaCollector(wrapper=wrapper) + def get_session(self): + creds = self.get_credentials() + auth = v3.Password(**creds) + return session.Session(auth=auth) + + def is_token_expired(self, token): + expires = datetime.datetime.strptime(token['expires'], + '%Y-%m-%dT%H:%M:%SZ') + return datetime.datetime.now() >= expires diff --git a/watcher/decision_engine/README.md b/watcher/decision_engine/README.md index 54f326808..f4713bc3d 100644 --- a/watcher/decision_engine/README.md +++ b/watcher/decision_engine/README.md @@ -46,7 +46,7 @@ This component pushes the new predicted metrics to the CEP in order to trigger n ## Watcher Cluster State Collector -This module of the Decision Engine provides a high level API for requesting status information from the InfluxDb database. +This module of the Decision Engine provides a high level API for requesting status information from the Nova API. A DSL will be provided in order to ease the development of new optimization strategies. @@ -55,14 +55,6 @@ Example of high level requests that may be provided : * get the state evolution in time of a group of instances from 9 AM to 10 AM for every day of the week * ... -## Watcher Resource Metrics Collector - -This module of the Decision Engine provides a high level API for requesting metrics information from the InfluxDb database. - -A DSL will be provided in order to ease the development of new optimization strategies. - -This component is distinct from the Cluster State Collector because it will probably have to deal with a much more important set of data and it may need a specific DSL for applying mathematical computes on metrics (min, max, average, ...). - ## Watcher Actions Planner diff --git a/watcher/decision_engine/api/messaging/decision_engine_command.py b/watcher/decision_engine/api/messaging/decision_engine_command.py index eac64aaef..27a8ce45a 100644 --- a/watcher/decision_engine/api/messaging/decision_engine_command.py +++ b/watcher/decision_engine/api/messaging/decision_engine_command.py @@ -16,8 +16,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six -class DecisionEngineCommand(object): +@six.add_metaclass(abc.ABCMeta) +class BaseDecisionEngineCommand(object): + @abc.abstractmethod def execute(self): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/decision_engine/api/messaging/event_consumer.py b/watcher/decision_engine/api/messaging/event_consumer.py index 9ae57ff7c..2b7c56563 100644 --- a/watcher/decision_engine/api/messaging/event_consumer.py +++ b/watcher/decision_engine/api/messaging/event_consumer.py @@ -16,15 +16,24 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six -class EventConsumer(object): +@six.add_metaclass(abc.ABCMeta) +class BaseEventConsumer(object): def __init__(self): - self.messaging = None + self._messaging = None - def set_messaging(self, messaging): - self.messaging = messaging + @property + def messaging(self): + return self._messaging + @messaging.setter + def messaging(self, e): + self._messaging = e + + @abc.abstractmethod def execute(self, request_id, context, data): - raise NotImplementedError('Not implemented ...') + raise NotImplementedError('Not implemented ...') # pragma:no cover diff --git a/watcher/decision_engine/api/planner/planner.py b/watcher/decision_engine/api/planner/planner.py index a502f84fc..ac001d9dd 100644 --- a/watcher/decision_engine/api/planner/planner.py +++ b/watcher/decision_engine/api/planner/planner.py @@ -16,9 +16,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six +@six.add_metaclass(abc.ABCMeta) class Planner(object): + @abc.abstractmethod def schedule(self, context, audit_uuid, solution): """The planner receives a solution to schedule @@ -29,4 +33,5 @@ class Planner(object): and performance requirements are met. """ # example: directed acyclic graph - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/decision_engine/api/solution/solution.py b/watcher/decision_engine/api/solution/solution.py index 602304213..62f8d638a 100644 --- a/watcher/decision_engine/api/solution/solution.py +++ b/watcher/decision_engine/api/solution/solution.py @@ -16,22 +16,47 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six +@six.add_metaclass(abc.ABCMeta) class Solution(object): def __init__(self): - self.modelOrigin = None - self.currentModel = None - self.efficiency = 0 + self._origin = None + self._model = None + self._efficiency = 0 - def get_efficiency(self): - return self.efficiency + @property + def efficiency(self): + return self._efficiency - def set_efficiency(self, efficiency): - self.efficiency = efficiency + @efficiency.setter + def efficiency(self, e): + self._efficiency = e - def set_model(self, current_model): - self.currentModel = current_model + @property + def model(self): + return self._model - def get_model(self): - return self.currentModel + @model.setter + def model(self, m): + self._model = m + + @property + def origin(self): + return self._origin + + @origin.setter + def origin(self, m): + self._origin = m + + @abc.abstractmethod + def add_change_request(self, r): + raise NotImplementedError( + "Should have implemented this") # pragma:no cover + + @abc.abstractproperty + def meta_actions(self): + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/decision_engine/api/solution/solution_comparator.py b/watcher/decision_engine/api/solution/solution_comparator.py index 57484aee8..b6a0e2224 100644 --- a/watcher/decision_engine/api/solution/solution_comparator.py +++ b/watcher/decision_engine/api/solution/solution_comparator.py @@ -16,8 +16,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six +@six.add_metaclass(abc.ABCMeta) class Solution(object): + @abc.abstractmethod def compare(self, sol1, sol2): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/decision_engine/api/solution/solution_evaluator.py b/watcher/decision_engine/api/solution/solution_evaluator.py index abe695664..5ec711ffb 100644 --- a/watcher/decision_engine/api/solution/solution_evaluator.py +++ b/watcher/decision_engine/api/solution/solution_evaluator.py @@ -16,8 +16,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six +@six.add_metaclass(abc.ABCMeta) class SolutionEvaluator(object): + @abc.abstractmethod def evaluate(self, solution): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/decision_engine/api/strategy/meta_action.py b/watcher/decision_engine/api/strategy/meta_action.py index 6f1aecc9e..7e2a79921 100644 --- a/watcher/decision_engine/api/strategy/meta_action.py +++ b/watcher/decision_engine/api/strategy/meta_action.py @@ -16,26 +16,33 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six from watcher.decision_engine.api.strategy.strategy import StrategyLevel +@six.add_metaclass(abc.ABCMeta) class MetaAction(object): def __init__(self): - self.level = StrategyLevel.conservative - self.priority = 0 + self._level = StrategyLevel.conservative + self._priority = 0 - def get_level(self): - return self.level + @property + def level(self): + return self._level - def set_level(self, level): - self.level = level + @level.setter + def level(self, l): + self._level = l - def set_priority(self, priority): - self.priority = priority + @property + def priority(self): + return self._priority - def get_priority(self): - return self.priority + @priority.setter + def priority(self, p): + self._priority = p def __str__(self): return " " diff --git a/watcher/decision_engine/api/strategy/selector.py b/watcher/decision_engine/api/strategy/selector.py index f06cb3ef9..e76fba174 100644 --- a/watcher/decision_engine/api/strategy/selector.py +++ b/watcher/decision_engine/api/strategy/selector.py @@ -16,8 +16,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six +@six.add_metaclass(abc.ABCMeta) class Selector(object): + @abc.abstractmethod def define_from_goal(self, goal_name): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/decision_engine/api/strategy/strategy.py b/watcher/decision_engine/api/strategy/strategy.py index b3a3af55f..3faa1c383 100644 --- a/watcher/decision_engine/api/strategy/strategy.py +++ b/watcher/decision_engine/api/strategy/strategy.py @@ -28,34 +28,13 @@ LOG = log.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) class Strategy(object): def __init__(self, name=None, description=None): - self.name = name + self._name = name self.description = description # default strategy level - self.strategy_level = StrategyLevel.conservative - self.metrics_collector = None - self.cluster_state_collector = None + self._strategy_level = StrategyLevel.conservative + self._cluster_state_collector = None # the solution given by the strategy - self.solution = DefaultSolution() - - def get_solution(self): - return self.solution - - def set_name(self, name): - self.name = name - - def get_name(self): - return self.name - - def get_strategy_strategy_level(self): - return self.strategy_level - - def set_strategy_strategy_level(self, strategy_level): - """Convervative to Aggressive - - the aims is to minimize le number of migrations - :param threshold: - """ - self.strategy_level = strategy_level + self._solution = DefaultSolution() @abc.abstractmethod def execute(self, model): @@ -65,14 +44,34 @@ class Strategy(object): :return: """ - def get_metrics_resource_collector(self): - return self.metrics_collector + @property + def solution(self): + return self._solution - def get_cluster_state_collector(self): - return self.cluster_state_collector + @solution.setter + def solution(self, s): + self._solution = s - def set_metrics_resource_collector(self, metrics_collector): - self.metrics_collector = metrics_collector + @property + def name(self): + return self._name - def set_cluster_state_collector(self, cluster_state_collector): - self.cluster_state_collector = cluster_state_collector + @name.setter + def name(self, n): + self._name = n + + @property + def strategy_level(self): + return self._strategy_level + + @strategy_level.setter + def strategy_level(self, s): + self._strategy_level = s + + @property + def state_collector(self): + return self._cluster_state_collector + + @state_collector.setter + def state_collector(self, s): + self._cluster_state_collector = s diff --git a/watcher/decision_engine/api/strategy/strategy_context.py b/watcher/decision_engine/api/strategy/strategy_context.py index 085a94629..706e47994 100644 --- a/watcher/decision_engine/api/strategy/strategy_context.py +++ b/watcher/decision_engine/api/strategy/strategy_context.py @@ -16,11 +16,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc +import six -class StrategyContext(object): - def __init__(self): - pass - +@six.add_metaclass(abc.ABCMeta) +class BaseStrategyContext(object): + @abc.abstractmethod def execute_strategy(self, model): - raise NotImplementedError("Should have implemented this") + raise NotImplementedError( + "Should have implemented this") # pragma:no cover diff --git a/watcher/decision_engine/framework/command/trigger_audit_command.py b/watcher/decision_engine/framework/command/trigger_audit_command.py index de8ee1d94..10eaf6c7d 100644 --- a/watcher/decision_engine/framework/command/trigger_audit_command.py +++ b/watcher/decision_engine/framework/command/trigger_audit_command.py @@ -17,11 +17,11 @@ from oslo_log import log from watcher.common.messaging.events.event import Event from watcher.decision_engine.api.messaging.decision_engine_command import \ - DecisionEngineCommand + BaseDecisionEngineCommand from watcher.decision_engine.framework.default_planner import DefaultPlanner from watcher.decision_engine.framework.messaging.events import Events -from watcher.decision_engine.framework.strategy.StrategyManagerImpl import \ - StrategyContextImpl +from watcher.decision_engine.framework.strategy.strategy_context import \ + StrategyContext from watcher.objects.audit import Audit from watcher.objects.audit import AuditStatus from watcher.objects.audit_template import AuditTemplate @@ -29,12 +29,11 @@ from watcher.objects.audit_template import AuditTemplate LOG = log.getLogger(__name__) -class TriggerAuditCommand(DecisionEngineCommand): - def __init__(self, messaging, statedb, ressourcedb): +class TriggerAuditCommand(BaseDecisionEngineCommand): + def __init__(self, messaging, model_collector): self.messaging = messaging - self.statedb = statedb - self.ressourcedb = ressourcedb - self.strategy_context = StrategyContextImpl() + self.model_collector = model_collector + self.strategy_context = StrategyContext() def notify(self, audit_uuid, event_type, status): event = Event() @@ -46,7 +45,7 @@ class TriggerAuditCommand(DecisionEngineCommand): payload) def update_audit(self, request_context, audit_uuid, state): - LOG.debug("update audit " + str(state)) + LOG.debug("update audit {0} ".format(state)) audit = Audit.get_by_uuid(request_context, audit_uuid) audit.state = state audit.save() @@ -61,16 +60,14 @@ class TriggerAuditCommand(DecisionEngineCommand): audit = self.update_audit(request_context, audit_uuid, AuditStatus.ONGOING) - # 3 - Retrieve metrics - cluster = self.statedb.get_latest_state_cluster() + # 3 - Retrieve cluster-data-model + cluster = self.model_collector.get_latest_cluster_data_model() # 4 - Select appropriate strategy audit_template = AuditTemplate.get_by_id(request_context, audit.audit_template_id) self.strategy_context.set_goal(audit_template.goal) - self.strategy_context.set_metrics_resource_collector( - self.ressourcedb) # 5 - compute change requests solution = self.strategy_context.execute_strategy(cluster) @@ -83,4 +80,4 @@ class TriggerAuditCommand(DecisionEngineCommand): self.update_audit(request_context, audit_uuid, AuditStatus.SUCCESS) except Exception as e: self.update_audit(request_context, audit_uuid, AuditStatus.FAILED) - LOG.error(" " + unicode(e)) + LOG.error("Execute audit command {0} ".format(unicode(e))) diff --git a/watcher/decision_engine/framework/default_planner.py b/watcher/decision_engine/framework/default_planner.py index 16061cb04..8f76fad11 100644 --- a/watcher/decision_engine/framework/default_planner.py +++ b/watcher/decision_engine/framework/default_planner.py @@ -28,6 +28,7 @@ from watcher import objects from watcher.decision_engine.framework.meta_actions.hypervisor_state import \ ChangeHypervisorState from watcher.decision_engine.framework.meta_actions.migrate import Migrate +from watcher.decision_engine.framework.meta_actions.nop import Nop from watcher.decision_engine.framework.meta_actions.power_state import \ ChangePowerState from watcher.objects.action import Status as AStatus @@ -35,6 +36,7 @@ from watcher.objects.action_plan import Status as APStatus LOG = log.getLogger(__name__) + # TODO(jed) The default planner is a very simple planner # https://wiki.openstack.org/wiki/NovaOrchestration/WorkflowEngines​ @@ -95,7 +97,8 @@ class DefaultPlanner(Planner): uuid, action.get_dest_hypervisor(). uuid, - description=str(action) + description="{0}".format( + action) ) elif isinstance(action, ChangePowerState): @@ -105,7 +108,9 @@ class DefaultPlanner(Planner): applies_to=action.target.uuid, parameter=action. powerstate. - value, description=str(action)) + value, + description="{0}".format( + action)) elif isinstance(action, ChangeHypervisorState): primitive = self.create_action(action_plan_id=action_plan.id, action_type=Primitives. @@ -113,8 +118,14 @@ class DefaultPlanner(Planner): applies_to=action.target.uuid, parameter=action.state. value, - description=str(action)) - + description="{0}".format( + action)) + elif isinstance(action, Nop): + primitive = self.create_action(action_plan_id=action_plan.id, + action_type=Primitives. + NOP.value, + description="{0}".format( + action)) else: raise MetaActionNotFound() priority = priority_primitives[primitive['action_type']] diff --git a/watcher/decision_engine/framework/manager_decision_engine.py b/watcher/decision_engine/framework/manager.py similarity index 94% rename from watcher/decision_engine/framework/manager_decision_engine.py rename to watcher/decision_engine/framework/manager.py index 48bba745c..c3844d19c 100644 --- a/watcher/decision_engine/framework/manager_decision_engine.py +++ b/watcher/decision_engine/framework/manager.py @@ -32,8 +32,8 @@ from watcher.decision_engine.framework.messaging.events import Events from watcher.common.messaging.notification_handler import \ NotificationHandler -from watcher.decision_engine.framework.strategy.StrategyManagerImpl import \ - StrategyContextImpl +from watcher.decision_engine.framework.strategy.strategy_context import \ + StrategyContext LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -76,7 +76,7 @@ class DecisionEngineManager(MessagingCore): # todo(jed) oslo_conf self.executor = ThreadPoolExecutor(max_workers=2) self.topic_control.add_endpoint(AuditEndpoint(self)) - self.context = StrategyContextImpl(self) + self.context = StrategyContext(self) def join(self): self.topic_control.join() @@ -93,7 +93,7 @@ class DecisionEngineManager(MessagingCore): LOG.debug("data => %s" % str(data)) event_consumer = EventConsumerFactory().factory(event_type) - event_consumer.set_messaging(self) + event_consumer.messaging = self event_consumer.execute(request_id, data) except Exception as e: LOG.error("evt %s" % e.message) diff --git a/watcher/decision_engine/framework/messaging/audit_endpoint.py b/watcher/decision_engine/framework/messaging/audit_endpoint.py index 63b5a6f65..eb55825dd 100644 --- a/watcher/decision_engine/framework/messaging/audit_endpoint.py +++ b/watcher/decision_engine/framework/messaging/audit_endpoint.py @@ -20,7 +20,8 @@ from oslo_log import log from watcher.decision_engine.framework.command.trigger_audit_command import \ TriggerAuditCommand -from watcher.metrics_engine.framework.collector_manager import CollectorManager +from watcher.metrics_engine.cluster_model_collector.manager import \ + CollectorManager LOG = log.getLogger(__name__) @@ -31,11 +32,9 @@ class AuditEndpoint(object): self.manager = CollectorManager() def do_trigger_audit(self, context, audit_uuid): - statedb = self.manager.get_statedb_collector() - ressourcedb = self.manager.get_metric_collector() + model_collector = self.manager.get_cluster_model_collector() - audit = TriggerAuditCommand(self.de, statedb, - ressourcedb) + audit = TriggerAuditCommand(self.de, model_collector) audit.execute(audit_uuid, context) def trigger_audit(self, context, audit_uuid): diff --git a/watcher/decision_engine/framework/meta_actions/hypervisor_state.py b/watcher/decision_engine/framework/meta_actions/hypervisor_state.py index 48468de5e..9dcde2db8 100644 --- a/watcher/decision_engine/framework/meta_actions/hypervisor_state.py +++ b/watcher/decision_engine/framework/meta_actions/hypervisor_state.py @@ -49,5 +49,5 @@ class ChangeHypervisorState(MetaAction): self._target = p def __str__(self): - return MetaAction.__str__(self) + " ChangeHypervisorState" + str( - self.target) + " =>" + str(self.state) + return "{0} {1} ChangeHypervisorState => {2}".format( + MetaAction.__str__(self), self.target, self.state) diff --git a/watcher/decision_engine/framework/meta_actions/migrate.py b/watcher/decision_engine/framework/meta_actions/migrate.py index fc5201561..4c0f6861a 100644 --- a/watcher/decision_engine/framework/meta_actions/migrate.py +++ b/watcher/decision_engine/framework/meta_actions/migrate.py @@ -70,6 +70,7 @@ class Migrate(MetaAction): return self.dest_hypervisor def __str__(self): - return MetaAction.__str__(self) + " Migrate " + str( - self.vm) + " from " + str( - self.source_hypervisor) + " to " + str(self.dest_hypervisor) + return "{0} Migrate {1} from {2} to {3}".format( + MetaAction.__str__(self), self.vm, + self.source_hypervisor, + self.dest_hypervisor) diff --git a/watcher/tests/decision_engine/test_loader.py b/watcher/decision_engine/framework/meta_actions/nop.py similarity index 79% rename from watcher/tests/decision_engine/test_loader.py rename to watcher/decision_engine/framework/meta_actions/nop.py index bbf5f77fa..8bf1533cd 100644 --- a/watcher/tests/decision_engine/test_loader.py +++ b/watcher/decision_engine/framework/meta_actions/nop.py @@ -16,10 +16,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from watcher.tests import base +from watcher.decision_engine.api.strategy.meta_action import MetaAction -class TestLoader(base.BaseTestCase): - - def test_loader(self): - pass +class Nop(MetaAction): + def __str__(self): + return "{0} Nop".format(MetaAction.__str__(self)) diff --git a/watcher/decision_engine/framework/meta_actions/power_state.py b/watcher/decision_engine/framework/meta_actions/power_state.py index ee98f198f..427a21649 100644 --- a/watcher/decision_engine/framework/meta_actions/power_state.py +++ b/watcher/decision_engine/framework/meta_actions/power_state.py @@ -48,5 +48,6 @@ class ChangePowerState(MetaAction): self._target = p def __str__(self): - return MetaAction.__str__(self) + "ChangePowerState " + str( - self.target) + " => " + str(self.powerstate) + return "{0} ChangePowerState {1} => {2} ".format( + MetaAction.__str__(self), + self.target, self.powerstate) diff --git a/watcher/decision_engine/framework/model/mapping.py b/watcher/decision_engine/framework/model/mapping.py index ed8bc94c5..53c0505a7 100644 --- a/watcher/decision_engine/framework/model/mapping.py +++ b/watcher/decision_engine/framework/model/mapping.py @@ -69,9 +69,10 @@ class Mapping(object): # remove vm self.mapping_vm.pop(vm_uuid) else: - LOG.warn("trying to delete the virtual machine " + str( - vm_uuid) + " but it was not found on hypervisor" + str( - node_uuid)) + LOG.warn( + "trying to delete the virtual machine {0} but it was not " + "found on hypervisor {1}".format( + vm_uuid, node_uuid)) finally: self.lock.release() diff --git a/watcher/decision_engine/framework/model/model_root.py b/watcher/decision_engine/framework/model/model_root.py index 396cebee1..0261011d5 100644 --- a/watcher/decision_engine/framework/model/model_root.py +++ b/watcher/decision_engine/framework/model/model_root.py @@ -77,7 +77,7 @@ class ModelRoot(object): return self.mapping def create_resource(self, r): - self.resource[str(r.get_name())] = r + self.resource[str(r.name)] = r def get_resource_from_id(self, id): return self.resource[str(id)] diff --git a/watcher/decision_engine/framework/model/named_element.py b/watcher/decision_engine/framework/model/named_element.py index 1bcf72b65..bf027e56a 100644 --- a/watcher/decision_engine/framework/model/named_element.py +++ b/watcher/decision_engine/framework/model/named_element.py @@ -38,4 +38,4 @@ class NamedElement(object): self._human_id = h def __str__(self): - return "[" + str(self.uuid) + "]" + return "[{0}]".format(self.uuid) diff --git a/watcher/decision_engine/framework/model/resource.py b/watcher/decision_engine/framework/model/resource.py index 3586bf9f4..862bde0ff 100644 --- a/watcher/decision_engine/framework/model/resource.py +++ b/watcher/decision_engine/framework/model/resource.py @@ -31,12 +31,17 @@ class Resource(object): :param capacity: max :return: """ - self.name = name + self._name = name self.capacity = capacity self.mapping = {} - def get_name(self): - return self.name + @property + def name(self): + return self._name + + @name.setter + def name(self, n): + self._name = n def set_capacity(self, element, value): self.mapping[element.uuid] = value diff --git a/watcher/decision_engine/framework/rpcapi.py b/watcher/decision_engine/framework/rpcapi.py index b60535f40..15dc3a1d3 100644 --- a/watcher/decision_engine/framework/rpcapi.py +++ b/watcher/decision_engine/framework/rpcapi.py @@ -32,9 +32,9 @@ from watcher.common.messaging.utils.transport_url_builder import \ TransportUrlBuilder from watcher.decision_engine.framework.events.event_consumer_factory import \ EventConsumerFactory -from watcher.decision_engine.framework.manager_decision_engine import \ +from watcher.decision_engine.framework.manager import \ decision_engine_opt_group -from watcher.decision_engine.framework.manager_decision_engine import \ +from watcher.decision_engine.framework.manager import \ WATCHER_DECISION_ENGINE_OPTS from watcher.decision_engine.framework.messaging.events import Events diff --git a/watcher/decision_engine/framework/strategy/StrategyManagerImpl.py b/watcher/decision_engine/framework/strategy/strategy_context.py similarity index 82% rename from watcher/decision_engine/framework/strategy/StrategyManagerImpl.py rename to watcher/decision_engine/framework/strategy/strategy_context.py index 4c01cae0f..5d37719d1 100644 --- a/watcher/decision_engine/framework/strategy/StrategyManagerImpl.py +++ b/watcher/decision_engine/framework/strategy/strategy_context.py @@ -15,8 +15,8 @@ # limitations under the License. from oslo_log import log -from watcher.decision_engine.api.strategy.strategy_context import \ - StrategyContext +from watcher.decision_engine.api.strategy.strategy_context import\ + BaseStrategyContext from watcher.decision_engine.framework.default_planner import DefaultPlanner from watcher.decision_engine.framework.strategy.strategy_selector import \ StrategySelector @@ -24,7 +24,7 @@ from watcher.decision_engine.framework.strategy.strategy_selector import \ LOG = log.getLogger(__name__) -class StrategyContextImpl(StrategyContext): +class StrategyContext(BaseStrategyContext): def __init__(self, broker=None): LOG.debug("Initializing decision_engine Engine API ") self.strategies = {} @@ -33,7 +33,6 @@ class StrategyContextImpl(StrategyContext): self.planner = DefaultPlanner() self.strategy_selector = StrategySelector() self.goal = None - self.metrics_resource_collector = None def add_strategy(self, strategy): self.strategies[strategy.name] = strategy @@ -45,12 +44,7 @@ class StrategyContextImpl(StrategyContext): def set_goal(self, goal): self.goal = goal - def set_metrics_resource_collector(self, metrics_resource_collector): - self.metrics_resource_collector = metrics_resource_collector - def execute_strategy(self, model): # todo(jed) create thread + refactoring selected_strategy = self.strategy_selector.define_from_goal(self.goal) - selected_strategy.set_metrics_resource_collector( - self.metrics_resource_collector) return selected_strategy.execute(model) diff --git a/watcher/decision_engine/strategies/basic_consolidation.py b/watcher/decision_engine/strategies/basic_consolidation.py index 5fa881664..8817fa2bc 100644 --- a/watcher/decision_engine/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategies/basic_consolidation.py @@ -18,16 +18,11 @@ # from oslo_log import log -from watcher.decision_engine.framework.model.vm_state import VMState -from watcher.metrics_engine.api.metrics_resource_collector import \ - AggregationFunction - from watcher.common.exception import ClusterEmpty from watcher.common.exception import ClusteStateNotDefined -from watcher.common.exception import MetricCollectorNotDefined -from watcher.common.exception import NoDataFound from watcher.decision_engine.api.strategy.strategy import Strategy from watcher.decision_engine.api.strategy.strategy import StrategyLevel + from watcher.decision_engine.framework.meta_actions.hypervisor_state import \ ChangeHypervisorState from watcher.decision_engine.framework.meta_actions.migrate import Migrate @@ -40,6 +35,9 @@ from watcher.decision_engine.framework.meta_actions.power_state import \ from watcher.decision_engine.framework.model.hypervisor_state import \ HypervisorState from watcher.decision_engine.framework.model.resource import ResourceType +from watcher.decision_engine.framework.model.vm_state import VMState +from watcher.metrics_engine.cluster_history.ceilometer import \ + CeilometerClusterHistory LOG = log.getLogger(__name__) @@ -73,7 +71,7 @@ class BasicConsolidation(Strategy): :param name: the name of the strategy :param description: a description of the strategy """ - Strategy.__init__(self, name, description) + super(BasicConsolidation, self).__init__(name, description) # set default value for the number of released nodes self.number_of_released_nodes = 0 @@ -85,6 +83,8 @@ class BasicConsolidation(Strategy): # set default value for the efficiency self.efficiency = 100 + self._ceilometer = None + # TODO(jed) improve threshold overbooking ?,... self.threshold_mem = 1 self.threshold_disk = 1 @@ -101,6 +101,16 @@ class BasicConsolidation(Strategy): # TODO(jed) bound migration attempts (80 %) self.bound_migration = 0.80 + @property + def ceilometer(self): + if self._ceilometer is None: + self._ceilometer = CeilometerClusterHistory() + return self._ceilometer + + @ceilometer.setter + def ceilometer(self, c): + self._ceilometer = c + def compute_attempts(self, size_cluster): """Upper bound of the number of migration @@ -123,10 +133,10 @@ class BasicConsolidation(Strategy): if src_hypervisor == dest_hypervisor: return False - LOG.debug('Migrate VM %s from %s to %s ', - str(src_hypervisor), - str(dest_hypervisor), - str(vm_to_mig)) + LOG.debug('Migrate VM {0} from {1} to {2} '.format(vm_to_mig, + src_hypervisor, + dest_hypervisor, + )) total_cores = 0 total_disk = 0 @@ -175,12 +185,6 @@ class BasicConsolidation(Strategy): cores_available = cap_cores.get_capacity(dest_hypervisor) disk_available = cap_disk.get_capacity(dest_hypervisor) mem_available = cap_mem.get_capacity(dest_hypervisor) - LOG.debug("VCPU %s/%s ", str(total_cores * self.threshold_cores), - str(cores_available), ) - LOG.debug("DISK %s/%s ", str(total_disk * self.threshold_disk), - str(disk_available), ) - LOG.debug("MEM %s/%s ", str(total_mem * self.threshold_mem), - str(mem_available)) if cores_available >= total_cores * self.threshold_cores \ and disk_available >= total_disk * self.threshold_disk \ @@ -213,7 +217,6 @@ class BasicConsolidation(Strategy): def calculate_weight(self, model, element, total_cores_used, total_disk_used, total_memory_used): """Calculate weight of every - :param model: :param element: :param total_cores_used: @@ -253,24 +256,22 @@ class BasicConsolidation(Strategy): :param model: :return: """ - metrics_collector = self.get_metrics_resource_collector() - if metrics_collector is None: - raise MetricCollectorNotDefined() + cpu_avg_vm = self.ceilometer. \ + statistic_aggregation(resource_id=hypervisor.uuid, + meter_name='compute.node.cpu.percent', + period="7200", + aggregate='avg' + ) + if cpu_avg_vm is None: + LOG.error( + "No values returned for {0} compute.node.cpu.percent".format( + hypervisor.uuid)) + cpu_avg_vm = 100 - cpu_compute_mean_16h = metrics_collector.get_measurement( - metric='compute_cpu_user_percent_gauge', - aggregation_function=AggregationFunction.MEAN, - start_time="16 hours before now", - filters=["resource_id=" + hypervisor.uuid + ""]) - if len(cpu_compute_mean_16h) > 0: - cpu_capacity = model.get_resource_from_id( - ResourceType.cpu_cores).get_capacity(hypervisor) - cpu_utilization = float(cpu_compute_mean_16h[0].value) - total_cores_used = cpu_capacity * (cpu_utilization / 100) - else: - raise NoDataFound( - "No values returned for " + str(hypervisor.uuid) + - " compute_cpu_percent_gauge") + cpu_capacity = model.get_resource_from_id( + ResourceType.cpu_cores).get_capacity(hypervisor) + + total_cores_used = cpu_capacity * (cpu_avg_vm / 100) return self.calculate_weight(model, hypervisor, total_cores_used, 0, @@ -295,29 +296,30 @@ class BasicConsolidation(Strategy): :param model: the model :return: score """ - # todo(jed) inject ressource metric - metric_collector = self.get_metrics_resource_collector() - if metric_collector is None: - raise MetricCollectorNotDefined() - if model is None: raise ClusteStateNotDefined() vm = model.get_vm_from_id(vm.uuid) - instance_cpu_mean_16 = metric_collector.get_measurement( - metric='instance_cpu_percent_gauge', - aggregation_function=AggregationFunction.MEAN, - start_time="16 hours before now", - filters=["resource_id=" + vm.uuid + ""]) - if len(instance_cpu_mean_16) > 0: - cpu_capacity = model.get_resource_from_id( - ResourceType.cpu_cores).get_capacity(vm) - vm_cpu_utilization = instance_cpu_mean_16[0].value - total_cores_used = cpu_capacity * (vm_cpu_utilization / 100) - else: - raise NoDataFound("No values returned for " + str(vm.uuid) + - " instance_cpu_percent_gauge") + vm_cpu_utilization = self.ceilometer. \ + statistic_aggregation(resource_id=vm.uuid, + meter_name='cpu_util', + period="7200", + aggregate='avg' + ) + if vm_cpu_utilization is None: + LOG.error( + "No values returned for {0} cpu_util".format(vm.uuid)) + vm_cpu_utilization = 100 + + cpu_capacity = model.get_resource_from_id( + ResourceType.cpu_cores).get_capacity(vm) + + total_cores_used = cpu_capacity * (vm_cpu_utilization / 100) + + return self.calculate_weight(model, vm, total_cores_used, + 0, + 0) return self.calculate_weight(model, vm, total_cores_used, 0, @@ -327,11 +329,12 @@ class BasicConsolidation(Strategy): if model is None: raise ClusteStateNotDefined() for node_id in model.get_all_hypervisors(): - builder = node_id + " utilization " + str( - (self.calculate_score_node( - model.get_hypervisor_from_id(node_id), - model)) * 100) + " %" - LOG.debug(builder) + LOG.debug("{0} utilization {1} % ". + format(node_id, + self.calculate_score_node( + model.get_hypervisor_from_id( + node_id), + model))) def execute(self, orign_model): LOG.debug("initialize Sercon Consolidation") @@ -352,18 +355,18 @@ class BasicConsolidation(Strategy): self.compute_attempts(size_cluster) - for hypevisor_id in current_model.get_all_hypervisors(): - hypervisor = current_model.get_hypervisor_from_id(hypevisor_id) + for hypervisor_id in current_model.get_all_hypervisors(): + hypervisor = current_model.get_hypervisor_from_id(hypervisor_id) count = current_model.get_mapping(). \ - get_node_vms_from_id(hypevisor_id) + get_node_vms_from_id(hypervisor_id) if len(count) == 0: change_power = ChangePowerState(hypervisor) change_power.powerstate = PowerState.g1_S1 - change_power.set_level(StrategyLevel.conservative) + change_power.level = StrategyLevel.conservative self.solution.add_change_request(change_power) if hypervisor.state == HypervisorState.ONLINE: h = ChangeHypervisorState(hypervisor) - h.set_level(StrategyLevel.aggressive) + h.level = StrategyLevel.aggressive h.state = HypervisorState.OFFLINE self.solution.add_change_request(h) @@ -376,10 +379,11 @@ class BasicConsolidation(Strategy): score = [] ''' calculate score of nodes based on load by VMs ''' - for hypevisor_id in current_model.get_all_hypervisors(): - hypervisor = current_model.get_hypervisor_from_id(hypevisor_id) + for hypervisor_id in current_model.get_all_hypervisors(): + hypervisor = current_model.get_hypervisor_from_id( + hypervisor_id) count = current_model.get_mapping(). \ - get_node_vms_from_id(hypevisor_id) + get_node_vms_from_id(hypervisor_id) if len(count) > 0: result = self.calculate_score_node(hypervisor, current_model) @@ -387,11 +391,11 @@ class BasicConsolidation(Strategy): ''' the hypervisor has not VMs ''' result = 0 if len(count) > 0: - score.append((hypevisor_id, result)) + score.append((hypervisor_id, result)) ''' sort compute nodes by Score decreasing ''''' s = sorted(score, reverse=True, key=lambda x: (x[1])) - LOG.debug("Hypervisor(s) BFD {0}".format(str(s))) + LOG.debug("Hypervisor(s) BFD {0}".format(s)) ''' get Node to be released ''' if len(score) == 0: @@ -415,7 +419,7 @@ class BasicConsolidation(Strategy): ''' sort VM's by Score ''' v = sorted(vm_score, reverse=True, key=lambda x: (x[1])) - LOG.debug("VM(s) BFD {0}".format(str(v))) + LOG.debug("VM(s) BFD {0}".format(v)) m = 0 tmp_vm_migration_schedule = [] @@ -442,8 +446,7 @@ class BasicConsolidation(Strategy): # live migration live_migrate.set_migration_type( MigrationType.pre_copy) - live_migrate.set_level( - StrategyLevel.conservative) + live_migrate.level = StrategyLevel.conservative tmp_vm_migration_schedule.append(live_migrate) @@ -453,12 +456,11 @@ class BasicConsolidation(Strategy): # from conservative to aggressive change_power = ChangePowerState(mig_src_hypervisor) change_power.powerstate = PowerState.g1_S1 - change_power.set_level( - StrategyLevel.conservative) + change_power.level = StrategyLevel.conservative tmp_vm_migration_schedule.append(change_power) h = ChangeHypervisorState(mig_src_hypervisor) - h.set_level(StrategyLevel.aggressive) + h.level = StrategyLevel.aggressive h.state = HypervisorState.OFFLINE tmp_vm_migration_schedule.append(h) @@ -481,6 +483,6 @@ class BasicConsolidation(Strategy): "efficiency": self.efficiency } LOG.debug(infos) - self.solution.set_model(current_model) - self.solution.set_efficiency(self.efficiency) + self.solution.model = current_model + self.solution.efficiency = self.efficiency return self.solution diff --git a/watcher/decision_engine/strategies/dummy_strategy.py b/watcher/decision_engine/strategies/dummy_strategy.py index 14a83870e..085d7b95e 100644 --- a/watcher/decision_engine/strategies/dummy_strategy.py +++ b/watcher/decision_engine/strategies/dummy_strategy.py @@ -19,10 +19,13 @@ from oslo_log import log from watcher.decision_engine.api.strategy.strategy import Strategy +from watcher.decision_engine.framework.meta_actions.nop import Nop LOG = log.getLogger(__name__) class DummyStrategy(Strategy): def execute(self, model): - return self.get_solution() + n = Nop() + self.solution.add_change_request(n) + return self.solution diff --git a/watcher/metrics_engine/api/metrics_resource_collector.py b/watcher/metrics_engine/api/metrics_resource_collector.py deleted file mode 100644 index 42afb4763..000000000 --- a/watcher/metrics_engine/api/metrics_resource_collector.py +++ /dev/null @@ -1,65 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import abc -from enum import Enum -import six - - -class AggregationFunction(Enum): - MEAN = 'mean' - COUNT = 'count' - - -class Measure(object): - def __init__(self, time, value): - self.time = time - self.value = value - - def __str__(self): - return str(self.time) + " " + str(self.value) - - -@six.add_metaclass(abc.ABCMeta) -class MetricsResourceCollector(object): - @abc.abstractmethod - def get_measurement(self, - metric, - callback=None, - start_time=None, - end_time=None, - filters=None, - aggregation_function=None, - intervals=None): - """ - - :param metric: The full name of a metric in the system. - Must be the complete name. Case sensitive - :param callback: Asynchronous Callback Functions to live retrev - :param start_time:Starting time for the query. - This may be an absolute or relative time. - :param end_time: An end time for the query. - If the end time is not supplied, the current time - on the TSD will be used. - :param filters: An optional set of tags for filtering or grouping - :param aggregation_function: A mathematical function - :param intervals: An optional interval and function - to reduce the number of data points returned - :return: - """ - raise NotImplementedError("Should have implemented this") diff --git a/watcher/metrics_engine/api/__init__.py b/watcher/metrics_engine/cluster_history/__init__.py similarity index 100% rename from watcher/metrics_engine/api/__init__.py rename to watcher/metrics_engine/cluster_history/__init__.py diff --git a/watcher/metrics_engine/cluster_history/api.py b/watcher/metrics_engine/cluster_history/api.py new file mode 100644 index 000000000..0c6e593d5 --- /dev/null +++ b/watcher/metrics_engine/cluster_history/api.py @@ -0,0 +1,44 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Authors: Jean-Emile DARTOIS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import abc +import six + +""" Work in progress Helper to query metrics """ + + +@six.add_metaclass(abc.ABCMeta) +class BaseClusterHistory(object): + @abc.abstractmethod + def statistic_aggregation(self, resource_id, meter_name, period, + aggregate='avg'): + raise NotImplementedError( + "Should have implemented this") # pragma: nocover + + @abc.abstractmethod + def get_last_sample_values(self, resource_id, meter_name, limit=1): + raise NotImplementedError( + "Should have implemented this") # pragma: nocover + + def query_sample(self, meter_name, query, limit=1): + raise NotImplementedError( + "Should have implemented this") # pragma: nocover + + def statistic_list(self, meter_name, query=None, period=None): + raise NotImplementedError( + "Should have implemented this") # pragma: nocover diff --git a/watcher/metrics_engine/cluster_history/ceilometer.py b/watcher/metrics_engine/cluster_history/ceilometer.py new file mode 100644 index 000000000..601b57cee --- /dev/null +++ b/watcher/metrics_engine/cluster_history/ceilometer.py @@ -0,0 +1,49 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Authors: Jean-Emile DARTOIS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from oslo_config import cfg +from oslo_log import log +from watcher.common.ceilometer import Client + +from watcher.metrics_engine.cluster_history.api import BaseClusterHistory + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class CeilometerClusterHistory(BaseClusterHistory): + def __init__(self): + self.ceilometer = Client() + + def statistic_list(self, meter_name, query=None, period=None): + return self.ceilometer.statistic_list(meter_name, query, period) + + def query_sample(self, meter_name, query, limit=1): + return self.ceilometer.query_sample(meter_name, query, limit) + + def get_last_sample_values(self, resource_id, meter_name, limit=1): + return self.ceilometer.get_last_sample_values(resource_id, meter_name, + limit) + + def statistic_aggregation(self, resource_id, meter_name, period, + aggregate='avg'): + return self.ceilometer.statistic_aggregation(resource_id, meter_name, + period, + aggregate) diff --git a/watcher/metrics_engine/framework/__init__.py b/watcher/metrics_engine/cluster_model_collector/__init__.py similarity index 100% rename from watcher/metrics_engine/framework/__init__.py rename to watcher/metrics_engine/cluster_model_collector/__init__.py diff --git a/watcher/metrics_engine/api/cluster_state_collector.py b/watcher/metrics_engine/cluster_model_collector/api.py similarity index 80% rename from watcher/metrics_engine/api/cluster_state_collector.py rename to watcher/metrics_engine/cluster_model_collector/api.py index 77c8008be..e60228337 100644 --- a/watcher/metrics_engine/api/cluster_state_collector.py +++ b/watcher/metrics_engine/cluster_model_collector/api.py @@ -21,8 +21,8 @@ import six @six.add_metaclass(abc.ABCMeta) -class ClusterStateCollector(object): - +class BaseClusterModelCollector(object): @abc.abstractmethod - def get_latest_state_cluster(self): - raise NotImplementedError("Should have implemented this") + def get_latest_cluster_data_model(self): + raise NotImplementedError( + "Should have implemented this") # pragma: nocover diff --git a/watcher/tests/applier/framework/test_manager.py b/watcher/metrics_engine/cluster_model_collector/manager.py similarity index 52% rename from watcher/tests/applier/framework/test_manager.py rename to watcher/metrics_engine/cluster_model_collector/manager.py index ff876851d..f024bcd4d 100644 --- a/watcher/tests/applier/framework/test_manager.py +++ b/watcher/metrics_engine/cluster_model_collector/manager.py @@ -17,18 +17,21 @@ # limitations under the License. # +from oslo_config import cfg +from oslo_log import log -from watcher.decision_engine.framework.manager_decision_engine import \ - DecisionEngineManager +from watcher.applier.framework.command.wrapper.nova_wrapper import NovaWrapper +from watcher.common.keystone import Client +from watcher.metrics_engine.cluster_model_collector.nova import \ + NovaClusterModelCollector -from watcher.tests import base +LOG = log.getLogger(__name__) +CONF = cfg.CONF -class TestApplierdManager(base.TestCase): - manager = DecisionEngineManager() - - def setUp(self): - super(TestApplierdManager, self).setUp() - - def test_event_receive(self): - pass +class CollectorManager(object): + def get_cluster_model_collector(self): + keystone = Client() + wrapper = NovaWrapper(keystone.get_credentials(), + session=keystone.get_session()) + return NovaClusterModelCollector(wrapper=wrapper) diff --git a/watcher/metrics_engine/framework/statedb_collector.py b/watcher/metrics_engine/cluster_model_collector/nova.py similarity index 89% rename from watcher/metrics_engine/framework/statedb_collector.py rename to watcher/metrics_engine/cluster_model_collector/nova.py index 4d00fac09..f9ad3e23f 100644 --- a/watcher/metrics_engine/framework/statedb_collector.py +++ b/watcher/metrics_engine/cluster_model_collector/nova.py @@ -26,18 +26,18 @@ from watcher.decision_engine.framework.model.model_root import ModelRoot from watcher.decision_engine.framework.model.resource import Resource from watcher.decision_engine.framework.model.resource import ResourceType from watcher.decision_engine.framework.model.vm import VM -from watcher.metrics_engine.api.cluster_state_collector import \ - ClusterStateCollector +from watcher.metrics_engine.cluster_model_collector.api import \ + BaseClusterModelCollector CONF = cfg.CONF LOG = log.getLogger(__name__) -class NovaCollector(ClusterStateCollector): +class NovaClusterModelCollector(BaseClusterModelCollector): def __init__(self, wrapper): self.wrapper = wrapper - def get_latest_state_cluster(self): + def get_latest_cluster_data_model(self): cluster = ModelRoot() mem = Resource(ResourceType.memory) @@ -51,7 +51,7 @@ class NovaCollector(ClusterStateCollector): hypervisors = self.wrapper.get_hypervisors_list() for h in hypervisors: service = self.wrapper.nova.services.find(id=h.service['id']) - # create hypervisor in stateDB + # create hypervisor in cluster_model_collector hypervisor = Hypervisor() hypervisor.uuid = service.host # set capacity @@ -63,7 +63,7 @@ class NovaCollector(ClusterStateCollector): cluster.add_hypervisor(hypervisor) vms = self.wrapper.get_vms_by_hypervisor(str(service.host)) for v in vms: - # create VM in stateDB + # create VM in cluster_model_collector vm = VM() vm.uuid = v.id # nova/nova/compute/vm_states.py @@ -74,7 +74,7 @@ class NovaCollector(ClusterStateCollector): mem.set_capacity(vm, v.flavor['ram']) disk.set_capacity(vm, v.flavor['disk']) num_cores.set_capacity(vm, v.flavor['vcpus']) - # print(dir(v)) + cluster.get_mapping().map(hypervisor, vm) cluster.add_vm(vm) return cluster diff --git a/watcher/metrics_engine/framework/datasources/__init__.py b/watcher/metrics_engine/framework/datasources/__init__.py deleted file mode 100644 index b5de5a85c..000000000 --- a/watcher/metrics_engine/framework/datasources/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -class InvalidQuery(Exception): - pass diff --git a/watcher/metrics_engine/framework/datasources/influxdb_collector.py b/watcher/metrics_engine/framework/datasources/influxdb_collector.py deleted file mode 100644 index 99467a5de..000000000 --- a/watcher/metrics_engine/framework/datasources/influxdb_collector.py +++ /dev/null @@ -1,169 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from concurrent.futures import ThreadPoolExecutor -import datetime -import parsedatetime - -from influxdb import InfluxDBClient -from oslo_config import cfg -from oslo_log import log -from watcher.metrics_engine.api.metrics_resource_collector import \ - AggregationFunction -from watcher.metrics_engine.api.metrics_resource_collector import Measure -from watcher.metrics_engine.api.metrics_resource_collector import \ - MetricsResourceCollector -from watcher.metrics_engine.framework.datasources.sql_ast.build_db_query import \ - DBQuery -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import And -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import \ - Condition - -LOG = log.getLogger(__name__) -CONF = cfg.CONF - -WATCHER_INFLUXDB_COLLECTOR_OPTS = [ - cfg.StrOpt('hostname', - default='localhost', - help='The hostname to connect to InfluxDB'), - cfg.IntOpt('port', - default='8086', - help='port to connect to InfluxDB, defaults to 8086'), - cfg.StrOpt('username', - default='root', - help='user to connect, defaults to root'), - cfg.StrOpt('password', - default='root', - help='password of the user, defaults to root'), - cfg.StrOpt('database', - default='indeed', - help='database name to connect to'), - cfg.BoolOpt('param ssl', - default=False, - help='use https instead of http to connect to InfluxDB'), - cfg.IntOpt('timeout', - default='5', - help='number of seconds Requests' - 'will wait for your client to establish a connection'), - cfg.IntOpt('timeout', - default='5', - help='number of seconds Requests' - 'will wait for your client to establish a connection'), - cfg.BoolOpt('use_udp', - default=False, - help='use UDP to connect to InfluxDB'), - cfg.IntOpt('udp_port', - default='4444', - help=' UDP port to connect to InfluxDB') -] - -influxdb_collector_opt_group = cfg.OptGroup( - name='watcher_influxdb_collector', - title='Defines the parameters of the module collector') -CONF.register_group(influxdb_collector_opt_group) -CONF.register_opts(WATCHER_INFLUXDB_COLLECTOR_OPTS, - influxdb_collector_opt_group) - - -class InfluxDBCollector(MetricsResourceCollector): - def __init__(self): - self.executor = ThreadPoolExecutor(max_workers=3) - - def get_client(self): - LOG.debug("InfluxDB " + str(CONF.watcher_influxdb_collector.hostname)) - influx = InfluxDBClient(CONF.watcher_influxdb_collector.hostname, - CONF.watcher_influxdb_collector.port, - CONF.watcher_influxdb_collector.username, - CONF.watcher_influxdb_collector.password, - CONF.watcher_influxdb_collector.database) - if {u'name': u'' + CONF.watcher_influxdb_collector.database + ''} not \ - in influx.get_list_database(): - raise Exception("The selected database does not exist" - "or the user credentials supplied are wrong") - return influx - - def convert(self, time): - cal = parsedatetime.Calendar() - time_struct, result = cal.parse(time) - return datetime.datetime(*time_struct[:6]).ctime() - - def build_query(self, - measurement, - start_time=None, - end_time=None, - filters=None, - aggregation_function=None, - intervals=None): - query = DBQuery(measurement) - conditions = [] - if start_time is not None: - c = Condition('time', '>', self.convert(start_time)) - conditions.append(c) - if end_time is not None: - c = Condition('time', '>', self.convert(end_time)) - conditions.append(c) - if filters is not None: - for f in filters: - c = Condition(f.split('=')[0], '=', f.split('=')[1]) - conditions.append(c) - if aggregation_function is not None: - if aggregation_function == AggregationFunction.MEAN: - query.select("mean(value)") - elif aggregation_function == AggregationFunction.COUNT: - query.select("count(value)") - - if intervals is not None: - query.groupby("time(" + str(intervals) + ")") - - if len(conditions) == 1: - query.where(conditions[0]) - elif len(conditions) != 0: - _where = And(conditions[0], conditions[1]) - for i in range(2, len(conditions)): - _where = And(_where, conditions[i]) - query.where(_where) - LOG.debug(query) - return query - - def get_measurement(self, - metric, - callback=None, - start_time=None, - end_time=None, - filters=None, - aggregation_function=None, - intervals=None): - results = [] - client = self.get_client() - query = self.build_query(metric, start_time, end_time, filters, - aggregation_function, intervals) - - results_from_influx = client.query(query) - - for item in results_from_influx[None]: - time = item.get('time', None) - for field in ['value', 'count', 'min', 'max', 'mean']: - value = item.get(field, None) - if value is not None: - row = Measure(time, value) - if callback is not None: - self.executor.submit(callback, row) - else: - results.append(row) - return results diff --git a/watcher/metrics_engine/framework/datasources/sql_ast/__init__.py b/watcher/metrics_engine/framework/datasources/sql_ast/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/metrics_engine/framework/datasources/sql_ast/build_db_query.py b/watcher/metrics_engine/framework/datasources/sql_ast/build_db_query.py deleted file mode 100644 index 0e805ddcc..000000000 --- a/watcher/metrics_engine/framework/datasources/sql_ast/build_db_query.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import From -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import \ - GroupBy -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import Limit -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import List -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import Select -from watcher.metrics_engine.framework.datasources.sql_ast.sql_ast import Where - - -class DBQuery(object): - def __init__(self, _from): - self._select = Select(_from) - self.inline = False - - def select_from(self, _from): - self._select._from = From(_from) - return self - - def where(self, where): - self._select.where = Where(where) - return self - - def groupby(self, g): - self._select.groupby = GroupBy(g) - return self - - def limit(self, limit): - self._select.limit = Limit(limit) - return self - - def select(self, *args): - self._select.what = List(*args) - return self - - def __str__(self): - self._select.inline = self.inline - s = str(self._select) - if not self.inline: - s += ';' - return s diff --git a/watcher/metrics_engine/framework/datasources/sql_ast/sql_ast.py b/watcher/metrics_engine/framework/datasources/sql_ast/sql_ast.py deleted file mode 100644 index 9a2de862b..000000000 --- a/watcher/metrics_engine/framework/datasources/sql_ast/sql_ast.py +++ /dev/null @@ -1,157 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -class ASTNode(object): - visited = False - inline = False - - def __init__(self): - pass - - def children(self): - for c in self._children: - yield c - - def __str__(self): - pass - - -class Condition(ASTNode): - def __init__(self, what, operator, _on): - self.what = what - self._on = "'" + str(_on) + "'" - self.operator = operator - - def __str__(self): - s = self.what + ' ' + self.operator + ' ' + self._on - if self.inline: - s = '' + s + '' - return s - - -class BinaryNode(ASTNode): - def __init__(self, left, right): - self.left = left - self.right = right - - def __str__(self): - return '({left} {middle} {right})'.format( - left=self.left, - middle=self.middle, - right=self.right - ) - - -class And(BinaryNode): - middle = 'AND' - - -class Or(BinaryNode): - middle = 'OR' - - -class List(ASTNode): - def __init__(self, *args): - for arg in args: - if hasattr(arg, 'inline'): - arg.inline = True - self.items = args - - def __str__(self): - lst = ', '.join(map(lambda x: str(x), self.items)) - if self.inline: - lst = '(' + lst + ')' - return lst - - -class Set(ASTNode): - def __init__(self, **kwargs): - self.items = kwargs - - def __str__(self): - return ', '.join( - ['{0}={1}'.format(key, val) for key, val in self.items.items()]) - - -class Returning(ASTNode): - def __init__(self, _list='*'): - self._list = _list - - def __str__(self): - return 'RETURNING {_list}'.format(_list=self._list) - - -class Limit(ASTNode): - def __init__(self, limit): - if hasattr(limit, 'inline'): - limit.inline = True - self.limit = limit - - def __str__(self): - return " LIMIT " + str(self.limit) - - -class Where(ASTNode): - def __init__(self, logic): - if hasattr(logic, 'inline'): - logic.inline = True - self.logic = logic - - def __str__(self): - return "WHERE " + str(self.logic) - - -class GroupBy(ASTNode): - def __init__(self, logic): - if hasattr(logic, 'inline'): - logic.inline = True - self.logic = logic - - def __str__(self): - return " group by " + str(self.logic) - - -class From(ASTNode): - def __init__(self, _from): - if hasattr(_from, 'inline'): - _from.inline = True - self._from = _from - - def __str__(self): - return 'FROM {_from}'.format(_from=self._from) - - -class Select(ASTNode): - def __init__(self, _from, what='*', where='', groupby='', - limit=''): - self._from = "\"" + _from + "\"" - self.what = what - self.where = where and Where(where) - self.groupby = groupby - self.limit = limit and Limit(limit) - self.inlint = False - - def __str__(self): - s = 'SELECT ' + str(self.what) + ' FROM ' + str( - self._from) + ' ' + str(self.where) + str(self.groupby) + str( - self.limit) - if self.inline: - s = '(' + s + ')' - return s diff --git a/watcher/opts.py b/watcher/opts.py index ec9f25132..75bc62eac 100644 --- a/watcher/opts.py +++ b/watcher/opts.py @@ -22,12 +22,9 @@ import watcher.api.app from watcher.applier.framework import manager_applier import watcher.common.messaging.messaging_core - -from watcher.decision_engine.framework import manager_decision_engine +from watcher.decision_engine.framework import manager from watcher.decision_engine.framework.strategy import strategy_loader from watcher.decision_engine.framework.strategy import strategy_selector -from watcher.metrics_engine.framework import collector_manager -from watcher.metrics_engine.framework.datasources import influxdb_collector def list_opts(): @@ -44,11 +41,7 @@ def list_opts(): ('watcher_strategies', strategy_loader.WATCHER_STRATEGY_OPTS), ('watcher_goals', strategy_selector.WATCHER_GOALS_OPTS), ('watcher_decision_engine', - manager_decision_engine.WATCHER_DECISION_ENGINE_OPTS), + manager.WATCHER_DECISION_ENGINE_OPTS), ('watcher_applier', - manager_applier.APPLIER_MANAGER_OPTS), - ('watcher_influxdb_collector', - influxdb_collector.WATCHER_INFLUXDB_COLLECTOR_OPTS), - ('watcher_metrics_collector', - collector_manager.WATCHER_METRICS_COLLECTOR_OPTS) + manager_applier.APPLIER_MANAGER_OPTS) ] diff --git a/watcher/tests/cmd/test_decision_engine.py b/watcher/tests/cmd/test_decision_engine.py index b7efb0ee0..c7614eb75 100644 --- a/watcher/tests/cmd/test_decision_engine.py +++ b/watcher/tests/cmd/test_decision_engine.py @@ -21,11 +21,10 @@ import types from mock import patch from oslo_config import cfg +from watcher.decision_engine.framework.manager import DecisionEngineManager from watcher.tests.base import TestCase from watcher.cmd import decisionengine -from watcher.decision_engine.framework.manager_decision_engine import \ - DecisionEngineManager class TestDecisionEngine(TestCase): diff --git a/watcher/tests/collector/test_influxdb.py b/watcher/tests/collector/test_influxdb.py deleted file mode 100644 index f4a02f9a6..000000000 --- a/watcher/tests/collector/test_influxdb.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import mock -from watcher.metrics_engine.api.metrics_resource_collector import \ - AggregationFunction -from watcher.metrics_engine.framework.datasources.influxdb_collector import \ - InfluxDBCollector - -from watcher.tests import base - - -class TestInfluxDB(base.TestCase): - def get_databases(self): - return {'name': 'indeed'} - - def test_get_measurement(self): - influx = InfluxDBCollector() - influx.get_client = mock.MagicMock() - influx.get_client.get_list_database = self.get_databases - result = influx.get_measurement("") - self.assertEqual(result, []) - - def test_build_query(self): - influx = InfluxDBCollector() - influx.get_client = mock.MagicMock() - query = influx.build_query("cpu_compute") - self.assertEqual(str(query), "SELECT * FROM \"cpu_compute\" ;") - - def test_build_query_aggregate(self): - influx = InfluxDBCollector() - influx.get_client = mock.MagicMock() - query = influx.build_query("cpu_compute", - aggregation_function=AggregationFunction. - COUNT) - self.assertEqual(str(query), - "SELECT count(value) FROM \"cpu_compute\" ;") - - def test_build_query_aggregate_intervals(self): - influx = InfluxDBCollector() - influx.get_client = mock.MagicMock() - query = influx.build_query("cpu_compute", - aggregation_function=AggregationFunction. - COUNT, - intervals="5m") - self.assertEqual(str(query), - "SELECT count(value) FROM \"cpu_compute\" " - "group by time(5m);") - - def test_build_query_aggregate_filters(self): - influx = InfluxDBCollector() - influx.get_client = mock.MagicMock() - filters = ['host=server1'] - query = influx.build_query("cpu_compute", - aggregation_function=AggregationFunction. - COUNT, - intervals="5m", - filters=filters) - self.assertEqual(str(query), 'SELECT count(value) FROM' - ' \"cpu_compute" WHERE' - ' host = \'server1\' group by time(5m);') - - def test_get_qusurement_start(self): - influx = InfluxDBCollector() - influx.get_client = mock.MagicMock() - influx.get_client.get_list_database = self.get_databases - result = influx.get_measurement("cpu_compute", start_time='now', - end_time="now") - self.assertEqual(result, []) diff --git a/watcher/tests/collector/test_nova_collector.py b/watcher/tests/collector/test_nova_collector.py index 31ea3eac5..568f0f2f8 100644 --- a/watcher/tests/collector/test_nova_collector.py +++ b/watcher/tests/collector/test_nova_collector.py @@ -19,8 +19,9 @@ import mock -from watcher.metrics_engine.framework.statedb_collector import NovaCollector +from watcher.metrics_engine.cluster_model_collector.nova import \ + NovaClusterModelCollector from watcher.tests import base @@ -29,7 +30,7 @@ class TestNovaCollector(base.TestCase): def setUp(self, mock_ksclient): super(TestNovaCollector, self).setUp() self.wrapper = mock.MagicMock() - self.nova_collector = NovaCollector(self.wrapper) + self.nova_collector = NovaClusterModelCollector(self.wrapper) def test_nova_collector(self): hypervisor = mock.Mock() @@ -39,5 +40,5 @@ class TestNovaCollector(base.TestCase): service.host = "" self.wrapper.get_hypervisors_list.return_value = {hypervisor} self.wrapper.nova.services.find.get.return_value = service - model = self.nova_collector.get_latest_state_cluster() + model = self.nova_collector.get_latest_cluster_data_model() self.assertIsNotNone(model) diff --git a/watcher/tests/collector/test_query.py b/watcher/tests/collector/test_query.py deleted file mode 100644 index 1de54e2de..000000000 --- a/watcher/tests/collector/test_query.py +++ /dev/null @@ -1,52 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from watcher.metrics_engine.framework.datasources.sql_ast.build_db_query import \ - DBQuery - -from watcher.tests import base - - -class TestDBQuery(base.TestCase): - - def test_query(self): - expected = "SELECT * FROM \"cpu_compute.cpu.user.percent_gauge\" ;" - query = DBQuery("cpu_compute.cpu.user.percent_gauge") - self.assertEqual(str(query), expected) - - def test_query_where(self): - expected = "SELECT * FROM" \ - " \"cpu_compute.cpu.user.percent_gauge\" WHERE host=jed;" - query = DBQuery("cpu_compute.cpu.user.percent_gauge").where( - "host=jed") - self.assertEqual(str(query), expected) - - def test_query_filter(self): - expected = "SELECT mean(value) FROM" \ - " \"cpu_compute.cpu.user.percent_gauge\" WHERE host=jed;" - query = DBQuery("cpu_compute.cpu.user.percent_gauge").where( - "host=jed").select("mean(value)") - self.assertEqual(str(query), expected) - - def test_query_groupby(self): - expected = "SELECT * FROM" \ - " \"cpu_compute.cpu.user.percent_gauge\" " \ - "group by time(5m);" - query = DBQuery("cpu_compute.cpu.user.percent_gauge").groupby( - "time(5m)") - self.assertEqual(str(query), expected) diff --git a/watcher/tests/decision_engine/api/messaging/__init__.py b/watcher/tests/decision_engine/api/messaging/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/tests/decision_engine/api/messaging/test_decision_engine_command.py b/watcher/tests/decision_engine/api/messaging/test_decision_engine_command.py deleted file mode 100755 index 38c97ba76..000000000 --- a/watcher/tests/decision_engine/api/messaging/test_decision_engine_command.py +++ /dev/null @@ -1,25 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.api.messaging.decision_engine_command import \ - DecisionEngineCommand -from watcher.tests import base - - -class TestDecisionEngineCommand(base.TestCase): - def test_execute(self): - DEC = DecisionEngineCommand() - self.assertRaises(NotImplementedError, DEC.execute) diff --git a/watcher/tests/decision_engine/api/messaging/test_event_consumer.py b/watcher/tests/decision_engine/api/messaging/test_event_consumer.py deleted file mode 100644 index 029645ac0..000000000 --- a/watcher/tests/decision_engine/api/messaging/test_event_consumer.py +++ /dev/null @@ -1,30 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.api.messaging.event_consumer import EventConsumer -from watcher.tests import base - - -class TestEventConsumer(base.TestCase): - def test_set_messaging(self): - messaging = "test message" - EC = EventConsumer() - EC.set_messaging(messaging) - self.assertEqual(EC.messaging, messaging) - - def test_execute(self): - EC = EventConsumer() - self.assertRaises(NotImplementedError, EC.execute, None, None, None) diff --git a/watcher/tests/decision_engine/api/planner/__init__.py b/watcher/tests/decision_engine/api/planner/__init__.py deleted file mode 100755 index e69de29bb..000000000 diff --git a/watcher/tests/decision_engine/api/planner/test_planner.py b/watcher/tests/decision_engine/api/planner/test_planner.py deleted file mode 100755 index a95a59f5d..000000000 --- a/watcher/tests/decision_engine/api/planner/test_planner.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.api.planner.planner import Planner -from watcher.tests import base - - -class TestPlanner(base.TestCase): - def test_schedule(self): - pl = Planner() - self.assertRaises(NotImplementedError, pl.schedule, None, None, None) diff --git a/watcher/tests/decision_engine/api/solution/__init__.py b/watcher/tests/decision_engine/api/solution/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/tests/decision_engine/api/solution/test_solution.py b/watcher/tests/decision_engine/api/solution/test_solution.py deleted file mode 100644 index fa57fa152..000000000 --- a/watcher/tests/decision_engine/api/solution/test_solution.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -'''from watcher.decision_engine.api.solution.solution import Solution -from watcher.decision_engine.framework.model.model_root import ModelRoot -from watcher.tests import base - -class TestSolution(base.TestCase): - def test_get_model(self): - sol = Solution() - current_model = - sol.set_model(current_model) -''' diff --git a/watcher/tests/decision_engine/api/solution/test_solution_comparator.py b/watcher/tests/decision_engine/api/solution/test_solution_comparator.py deleted file mode 100755 index bb119fd60..000000000 --- a/watcher/tests/decision_engine/api/solution/test_solution_comparator.py +++ /dev/null @@ -1,30 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.api.solution.solution import Solution as sol -from watcher.decision_engine.api.solution.solution_comparator import Solution -from watcher.tests import base - - -class test_Solution_Comparator(base.TestCase): - def test_compare(self): - sol1 = sol() - sol2 = sol() - solution_comparator = Solution() - self.assertRaises(NotImplementedError, - solution_comparator.compare, - sol1, - sol2) diff --git a/watcher/tests/decision_engine/api/solution/test_solution_evaluator.py b/watcher/tests/decision_engine/api/solution/test_solution_evaluator.py deleted file mode 100755 index 8fb393049..000000000 --- a/watcher/tests/decision_engine/api/solution/test_solution_evaluator.py +++ /dev/null @@ -1,25 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.api.solution.solution_evaluator import \ - SolutionEvaluator -from watcher.tests import base - - -class TestSolutionEvaluator(base.TestCase): - def test_evaluate(self): - SE = SolutionEvaluator() - self.assertRaises(NotImplementedError, SE.evaluate, None) diff --git a/watcher/tests/decision_engine/api/strategy/test_meta_action.py b/watcher/tests/decision_engine/api/strategy/test_meta_action.py index 77c64929b..f8859b7f3 100644 --- a/watcher/tests/decision_engine/api/strategy/test_meta_action.py +++ b/watcher/tests/decision_engine/api/strategy/test_meta_action.py @@ -20,11 +20,11 @@ from watcher.tests import base class TestMetaAction(base.TestCase): def test_get_priority(self): - MA = MetaAction() - MA.set_priority(3) - self.assertEqual(MA.get_priority(), 3) + ma = MetaAction() + ma.priority = 3 + self.assertEqual(ma.priority, 3) def test_get_level(self): - MA = MetaAction() - MA.set_level(5) - self.assertEqual(MA.get_level(), 5) + ma = MetaAction() + ma.level = 5 + self.assertEqual(ma.level, 5) diff --git a/watcher/tests/decision_engine/api/strategy/test_selector.py b/watcher/tests/decision_engine/api/strategy/test_selector.py deleted file mode 100755 index 2d01d363c..000000000 --- a/watcher/tests/decision_engine/api/strategy/test_selector.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.api.strategy.selector import Selector -from watcher.tests import base - - -class TestSelector(base.TestCase): - def test_define_from_goal(self): - Sel = Selector() - self.assertRaises(NotImplementedError, Sel.define_from_goal, None) diff --git a/watcher/tests/decision_engine/api/strategy/test_strategy_context.py b/watcher/tests/decision_engine/api/strategy/test_strategy_context.py deleted file mode 100644 index bf020f6c3..000000000 --- a/watcher/tests/decision_engine/api/strategy/test_strategy_context.py +++ /dev/null @@ -1,25 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.api.strategy.strategy_context import \ - StrategyContext -from watcher.tests import base - - -class TestStrategyContext(base.TestCase): - def test_execute_strategy(self): - SC = StrategyContext() - self.assertRaises(NotImplementedError, SC.execute_strategy, None) diff --git a/watcher/tests/decision_engine/faker_cluster_state.py b/watcher/tests/decision_engine/faker_cluster_state.py index 124e9e7d9..3e7098118 100644 --- a/watcher/tests/decision_engine/faker_cluster_state.py +++ b/watcher/tests/decision_engine/faker_cluster_state.py @@ -24,15 +24,15 @@ from watcher.decision_engine.framework.model.model_root import ModelRoot from watcher.decision_engine.framework.model.resource import Resource from watcher.decision_engine.framework.model.resource import ResourceType from watcher.decision_engine.framework.model.vm import VM -from watcher.metrics_engine.api.cluster_state_collector import \ - ClusterStateCollector +from watcher.metrics_engine.cluster_model_collector.api import \ + BaseClusterModelCollector -class FakerStateCollector(ClusterStateCollector): +class FakerModelCollector(BaseClusterModelCollector): def __init__(self): pass - def get_latest_state_cluster(self): + def get_latest_cluster_data_model(self): return self.generate_scenario_1() def generate_random(self, count_nodes, number_of_vm_per_node): @@ -57,7 +57,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.create_resource(disk) for i in range(0, count_node): - node_uuid = "Node_" + str(i) + node_uuid = "Node_{0}".format(i) hypervisor = Hypervisor() hypervisor.uuid = node_uuid mem.set_capacity(hypervisor, 132) @@ -67,7 +67,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.add_hypervisor(hypervisor) for i in range(0, count_vm): - vm_uuid = "VM_" + str(i) + vm_uuid = "VM_{0}".format(i) vm = VM() vm.uuid = vm_uuid # print("create "+str(vm)) @@ -108,7 +108,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.create_resource(disk) for i in range(0, count_node): - node_uuid = "Node_" + str(i) + node_uuid = "Node_{0}".format(i) node = Hypervisor() node.uuid = node_uuid @@ -119,7 +119,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.add_hypervisor(node) for i in range(0, count_vm): - vm_uuid = "VM_" + str(i) + vm_uuid = "VM_{0}".format(i) vm = VM() vm.uuid = vm_uuid # print("create "+str(vm)) @@ -179,7 +179,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.create_resource(disk) for i in range(0, count_node): - node_uuid = "Node_" + str(i) + node_uuid = "Node_{0}".format(i) node = Hypervisor() node.uuid = node_uuid mem.set_capacity(node, 132) @@ -216,7 +216,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.create_resource(disk) for i in range(0, count_node): - node_uuid = "Node_" + str(i) + node_uuid = "Node_{0}".format(i) node = Hypervisor() node.uuid = node_uuid mem.set_capacity(node, 132) @@ -226,7 +226,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.add_hypervisor(node) for i in range(0, count_vm): - vm_uuid = "VM_" + str(i) + vm_uuid = "VM_{0}".format(i) vm = VM() vm.uuid = vm_uuid # print("create "+str(vm)) @@ -237,21 +237,21 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.add_vm(vm) indice = 0 for j in range(0, 2): - node_uuid = "Node_" + str(j) + node_uuid = "Node_{0}".format(j) for i in range(indice, 3): - vm_uuid = "VM_" + str(i) + vm_uuid = "VM_{0}".format(i) self.map(current_state_cluster, node_uuid, vm_uuid) for j in range(2, 5): - node_uuid = "Node_" + str(j) + node_uuid = "Node_{0}".format(j) for i in range(indice, 4): - vm_uuid = "VM_" + str(i) + vm_uuid = "VM_{0}".format(i) self.map(current_state_cluster, node_uuid, vm_uuid) for j in range(5, 10): - node_uuid = "Node_" + str(j) + node_uuid = "Node_{0}".format(j) for i in range(indice, 4): - vm_uuid = "VM_" + str(i) + vm_uuid = "VM_{0}".format(i) self.map(current_state_cluster, node_uuid, vm_uuid) return current_state_cluster @@ -278,7 +278,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.create_resource(disk) for i in range(0, count_node): - node_uuid = "Node_" + str(i) + node_uuid = "Node_{0}".format(i) node = Hypervisor() node.uuid = node_uuid @@ -289,7 +289,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.add_hypervisor(node) for i in range(0, count_vm): - vm_uuid = "VM_" + str(i) + vm_uuid = "VM_{0}".format(i) vm = VM() vm.uuid = vm_uuid # print("create "+str(vm)) @@ -325,7 +325,7 @@ class FakerStateCollector(ClusterStateCollector): current_state_cluster.create_resource(disk) for i in range(0, count_node): - node_uuid = "Node_" + str(i) + node_uuid = "Node_{0}".format(i) node = Hypervisor() node.uuid = node_uuid diff --git a/watcher/tests/decision_engine/faker_metrics_collector.py b/watcher/tests/decision_engine/faker_metrics_collector.py index 6a73bf50b..553d1f276 100644 --- a/watcher/tests/decision_engine/faker_metrics_collector.py +++ b/watcher/tests/decision_engine/faker_metrics_collector.py @@ -19,93 +19,22 @@ import random -from watcher.metrics_engine.api.metrics_resource_collector import Measure -from watcher.metrics_engine.api.metrics_resource_collector import \ - MetricsResourceCollector - -class FakerMetricsCollector(MetricsResourceCollector): +class FakerMetricsCollector(object): def __init__(self): self.emptytype = "" def empty_one_metric(self, emptytype): self.emptytype = emptytype - def get_measurement(self, - metric, - callback=None, - start_time=None, - end_time=None, - filters=None, - aggregation_function=None, - intervals=None): - - results = [] - if metric == "compute_cpu_user_percent_gauge": - if self.emptytype == "CPU_COMPUTE": - pass - else: - results.append(Measure(0, 5)) - elif metric == "instance_cpu_percent_gauge": - results.append( - self.get_average_usage_vm_cpu(filters[0].split('=')[1])) - elif metric == "instance_memory_resident_used_bytes_gauge": - results.append( - self.get_average_usage_vm_memory(filters[0].split('=')[1])) - elif metric == "instance_disk_used_bytes_gauge": - if self.emptytype == "DISK_COMPUTE": - pass - else: - results.append( - self.get_average_usage_vm_disk(filters[0].split('=')[1])) - elif metric == "compute_memory_used_bytes_gauge": - if self.emptytype == "MEM_COMPUTE": - pass - else: - results.append(self.get_usage_node_cpu( - filters[0].split('=')[1])) - elif metric == "compute_disk_size_used_bytes_gauge": - if self.emptytype == "DISK_COMPUTE": - pass - else: - results.append(self.get_usage_node_disk( - filters[0].split('=')[1])) - else: - results.append(Measure(0, 0)) - return results - - def get_usage_node_disk(self, uuid): - """The last VM CPU usage values to average - - :param uuid:00 - :return: - """ - # query influxdb stream - - # compute in stream - - # Normalize - mock = {} - # node 0 - mock['Node_0'] = Measure(0, 7) - mock['Node_1'] = Measure(0, 100) - # node 1 - mock['Node_2'] = Measure(0, 10) - # node 2 - mock['Node_3'] = Measure(0, 5) - mock['Node_4'] = Measure(0, 5) - mock['Node_5'] = Measure(0, 10) - - # node 3 - mock['Node_6'] = Measure(0, 8) - - # node 4 - mock['VM_7'] = Measure(0, 4) - if uuid not in mock.keys(): - # mock[uuid] = random.randint(1, 4) - mock[uuid] = Measure(0, 8) - - return mock[str(uuid)] + def mock_get_statistics(self, resource_id, meter_name, period, + aggregate='avg'): + result = 0 + if meter_name == "compute.node.cpu.percent": + result = self.get_usage_node_cpu(resource_id) + elif meter_name == "cpu_util": + result = self.get_average_usage_vm_cpu(resource_id) + return result def get_usage_node_cpu(self, uuid): """The last VM CPU usage values to average @@ -120,25 +49,26 @@ class FakerMetricsCollector(MetricsResourceCollector): # Normalize mock = {} # node 0 - mock['Node_0'] = Measure(0, 7) - mock['Node_1'] = Measure(0, 7) + mock['Node_0'] = 7 + mock['Node_1'] = 7 # node 1 - mock['Node_2'] = Measure(0, 80) + mock['Node_2'] = 80 # node 2 - mock['Node_3'] = Measure(0, 5) - mock['Node_4'] = Measure(0, 5) - mock['Node_5'] = Measure(0, 10) + mock['Node_3'] = 5 + mock['Node_4'] = 5 + mock['Node_5'] = 10 # node 3 - mock['Node_6'] = Measure(0, 8) - + mock['Node_6'] = 8 + mock['Node_19'] = 10 # node 4 - mock['VM_7'] = Measure(0, 4) + mock['VM_7'] = 4 + if uuid not in mock.keys(): # mock[uuid] = random.randint(1, 4) - mock[uuid] = Measure(0, 8) + mock[uuid] = 8 - return mock[str(uuid)] + return float(mock[str(uuid)]) def get_average_usage_vm_cpu(self, uuid): """The last VM CPU usage values to average @@ -153,70 +83,70 @@ class FakerMetricsCollector(MetricsResourceCollector): # Normalize mock = {} # node 0 - mock['VM_0'] = Measure(0, 7) - mock['VM_1'] = Measure(0, 7) + mock['VM_0'] = 7 + mock['VM_1'] = 7 # node 1 - mock['VM_2'] = Measure(0, 10) + mock['VM_2'] = 10 # node 2 - mock['VM_3'] = Measure(0, 5) - mock['VM_4'] = Measure(0, 5) - mock['VM_5'] = Measure(0, 10) + mock['VM_3'] = 5 + mock['VM_4'] = 5 + mock['VM_5'] = 10 # node 3 - mock['VM_6'] = Measure(0, 8) + mock['VM_6'] = 8 # node 4 - mock['VM_7'] = Measure(0, 4) + mock['VM_7'] = 4 if uuid not in mock.keys(): # mock[uuid] = random.randint(1, 4) - mock[uuid] = Measure(0, 8) + mock[uuid] = 8 return mock[str(uuid)] def get_average_usage_vm_memory(self, uuid): mock = {} # node 0 - mock['VM_0'] = Measure(0, 2) - mock['VM_1'] = Measure(0, 5) + mock['VM_0'] = 2 + mock['VM_1'] = 5 # node 1 - mock['VM_2'] = Measure(0, 5) + mock['VM_2'] = 5 # node 2 - mock['VM_3'] = Measure(0, 8) - mock['VM_4'] = Measure(0, 5) - mock['VM_5'] = Measure(0, 16) + mock['VM_3'] = 8 + mock['VM_4'] = 5 + mock['VM_5'] = 16 # node 3 - mock['VM_6'] = Measure(0, 8) + mock['VM_6'] = 8 # node 4 - mock['VM_7'] = Measure(0, 4) + mock['VM_7'] = 4 if uuid not in mock.keys(): # mock[uuid] = random.randint(1, 4) - mock[uuid] = Measure(0, 10) + mock[uuid] = 10 return mock[str(uuid)] def get_average_usage_vm_disk(self, uuid): mock = {} # node 0 - mock['VM_0'] = Measure(0, 2) - mock['VM_1'] = Measure(0, 2) + mock['VM_0'] = 2 + mock['VM_1'] = 2 # node 1 - mock['VM_2'] = Measure(0, 2) + mock['VM_2'] = 2 # node 2 - mock['VM_3'] = Measure(0, 10) - mock['VM_4'] = Measure(0, 15) - mock['VM_5'] = Measure(0, 20) + mock['VM_3'] = 10 + mock['VM_4'] = 15 + mock['VM_5'] = 20 # node 3 - mock['VM_6'] = Measure(0, 8) + mock['VM_6'] = 8 # node 4 - mock['VM_7'] = Measure(0, 4) + mock['VM_7'] = 4 if uuid not in mock.keys(): # mock[uuid] = random.randint(1, 4) - mock[uuid] = Measure(0, 4) + mock[uuid] = 4 return mock[str(uuid)] diff --git a/watcher/tests/decision_engine/framework/command/test_trigger_audit_command.py b/watcher/tests/decision_engine/framework/command/test_trigger_audit_command.py index 5a2b2f5e4..2537c93d3 100644 --- a/watcher/tests/decision_engine/framework/command/test_trigger_audit_command.py +++ b/watcher/tests/decision_engine/framework/command/test_trigger_audit_command.py @@ -23,14 +23,11 @@ from watcher.objects.audit import Audit from watcher.objects.audit import AuditStatus from watcher.tests.db.base import DbTestCase from watcher.tests.decision_engine.faker_cluster_state import \ - FakerStateCollector -from watcher.tests.decision_engine.faker_metrics_collector import \ - FakerMetricsCollector + FakerModelCollector from watcher.tests.objects import utils as obj_utils class TestTriggerAuditCommand(DbTestCase): - def setUp(self): super(TestTriggerAuditCommand, self).setUp() self.audit_template = obj_utils.create_test_audit_template( @@ -41,33 +38,26 @@ class TestTriggerAuditCommand(DbTestCase): def test_trigger_audit_without_errors(self): try: - statedb = FakerStateCollector() - ressourcedb = FakerMetricsCollector() - command = TriggerAuditCommand(MagicMock(), statedb, ressourcedb) + model_collector = FakerModelCollector() + command = TriggerAuditCommand(MagicMock(), model_collector) command.execute(self.audit.uuid, self.context) except Exception: self.fail("The audit should be trigged without error") - def test_trigger_audit_with_errors(self): - try: - command = TriggerAuditCommand(MagicMock(), 0, 0) - command.execute(self.audit.uuid, self.context) - except Exception: - self.fail("The audit should be trigged with error") - - def test_trigger_audit_state_succes(self): - statedb = FakerStateCollector() - ressourcedb = FakerMetricsCollector() - command = TriggerAuditCommand(MagicMock(), statedb, ressourcedb) + def test_trigger_audit_state_success(self): + model_collector = FakerModelCollector() + command = TriggerAuditCommand(MagicMock(), model_collector) + command.strategy_context.execute_strategy = MagicMock() command.execute(self.audit.uuid, self.context) audit = Audit.get_by_uuid(self.context, self.audit.uuid) self.assertEqual(AuditStatus.SUCCESS, audit.state) def test_trigger_audit_send_notification(self): messaging = MagicMock() - statedb = FakerStateCollector() - ressourcedb = FakerMetricsCollector() - command = TriggerAuditCommand(messaging, statedb, ressourcedb) + model_collector = FakerModelCollector() + command = TriggerAuditCommand(messaging, model_collector) + command.strategy_context.execute_strategy = MagicMock() + command.execute(self.audit.uuid, self.context) call_on_going = call(Events.TRIGGER_AUDIT.name, { diff --git a/watcher/tests/decision_engine/framework/messaging/test_audit_endpoint.py b/watcher/tests/decision_engine/framework/messaging/test_audit_endpoint.py index 8824de58d..38d224fa5 100644 --- a/watcher/tests/decision_engine/framework/messaging/test_audit_endpoint.py +++ b/watcher/tests/decision_engine/framework/messaging/test_audit_endpoint.py @@ -15,17 +15,17 @@ # limitations under the License. import mock from mock import MagicMock + from watcher.common import utils from watcher.decision_engine.framework.command.trigger_audit_command import \ TriggerAuditCommand from watcher.decision_engine.framework.messaging.audit_endpoint import \ AuditEndpoint -from watcher.metrics_engine.framework.collector_manager import CollectorManager +from watcher.metrics_engine.cluster_model_collector.manager import \ + CollectorManager from watcher.tests import base from watcher.tests.decision_engine.faker_cluster_state import \ - FakerStateCollector -from watcher.tests.decision_engine.faker_metrics_collector import \ - FakerMetricsCollector + FakerModelCollector class TriggerAuditCommandWithExecutor(TriggerAuditCommand): @@ -43,12 +43,11 @@ class TestAuditEndpoint(base.TestCase): def test_do_trigger_audit(self): audit_uuid = utils.generate_uuid() - statedb = FakerStateCollector() - ressourcedb = FakerMetricsCollector() - command = TriggerAuditCommand(MagicMock(), statedb, ressourcedb) + model_collector = FakerModelCollector() + command = TriggerAuditCommand(MagicMock(), model_collector) endpoint = AuditEndpoint(command) - with mock.patch.object(CollectorManager, 'get_statedb_collector') \ + with mock.patch.object(CollectorManager, 'get_cluster_model_collector') \ as mock_call2: mock_call2.return_value = 0 @@ -61,10 +60,9 @@ class TestAuditEndpoint(base.TestCase): def test_trigger_audit(self): audit_uuid = utils.generate_uuid() - statedb = FakerStateCollector() - ressourcedb = FakerMetricsCollector() + model_collector = FakerModelCollector() command = TriggerAuditCommandWithExecutor(MagicMock(), - statedb, ressourcedb) + model_collector) endpoint = AuditEndpoint(command) with mock.patch.object(TriggerAuditCommandWithExecutor, 'executor') \ diff --git a/watcher/tests/decision_engine/framework/model/test_mapping.py b/watcher/tests/decision_engine/framework/model/test_mapping.py index b902fc635..556571868 100644 --- a/watcher/tests/decision_engine/framework/model/test_mapping.py +++ b/watcher/tests/decision_engine/framework/model/test_mapping.py @@ -21,12 +21,12 @@ from watcher.decision_engine.framework.model.hypervisor import Hypervisor from watcher.decision_engine.framework.model.vm_state import VMState from watcher.tests import base from watcher.tests.decision_engine.faker_cluster_state import \ - FakerStateCollector + FakerModelCollector class TestMapping(base.BaseTestCase): def test_get_node_from_vm(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() vms = model.get_all_vms() @@ -38,14 +38,14 @@ class TestMapping(base.BaseTestCase): self.assertEqual(node.uuid, 'Node_0') def test_get_node_from_vm_id(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() hyps = model.mapping.get_node_vms_from_id("BLABLABLA") self.assertEqual(hyps.__len__(), 0) def test_get_all_vms(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() vms = model.get_all_vms() @@ -56,7 +56,7 @@ class TestMapping(base.BaseTestCase): self.assertEqual(vms['VM_1'].uuid, 'VM_1') def test_get_mapping(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() mapping_vm = model.mapping.get_mapping_vm() @@ -65,7 +65,7 @@ class TestMapping(base.BaseTestCase): self.assertEqual(mapping_vm['VM_1'], 'Node_1') def test_migrate_vm(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() vms = model.get_all_vms() keys = vms.keys() @@ -80,12 +80,12 @@ class TestMapping(base.BaseTestCase): self.assertEqual(model.mapping.migrate_vm(vm1, hyp0, hyp1), True) def test_unmap_from_id_log_warning(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() vms = model.get_all_vms() keys = vms.keys() vm0 = vms[keys[0]] - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id @@ -94,7 +94,7 @@ class TestMapping(base.BaseTestCase): # hypervisor.uuid)), 1) def test_unmap_from_id(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() vms = model.get_all_vms() keys = vms.keys() diff --git a/watcher/tests/decision_engine/framework/model/test_model.py b/watcher/tests/decision_engine/framework/model/test_model.py index 3f92532b7..1a5401499 100644 --- a/watcher/tests/decision_engine/framework/model/test_model.py +++ b/watcher/tests/decision_engine/framework/model/test_model.py @@ -24,14 +24,14 @@ from watcher.decision_engine.framework.model.hypervisor_state import \ HypervisorState from watcher.decision_engine.framework.model.model_root import ModelRoot from watcher.tests.decision_engine.faker_cluster_state import \ - FakerStateCollector + FakerModelCollector from watcher.tests import base class TestModel(base.BaseTestCase): def test_model(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_1() self.assertEqual(len(model._hypervisors), 5) @@ -40,7 +40,7 @@ class TestModel(base.BaseTestCase): def test_add_hypervisor(self): model = ModelRoot() - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id model.add_hypervisor(hypervisor) @@ -48,7 +48,7 @@ class TestModel(base.BaseTestCase): def test_delete_hypervisor(self): model = ModelRoot() - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id model.add_hypervisor(hypervisor) @@ -60,7 +60,7 @@ class TestModel(base.BaseTestCase): def test_get_all_hypervisors(self): model = ModelRoot() for i in range(10): - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id model.add_hypervisor(hypervisor) @@ -71,7 +71,7 @@ class TestModel(base.BaseTestCase): def test_set_get_state_hypervisors(self): model = ModelRoot() - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id model.add_hypervisor(hypervisor) @@ -95,23 +95,23 @@ class TestModel(base.BaseTestCase): # self.assert(len(model._vms)) def test_hypervisor_from_id_raise(self): model = ModelRoot() - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id model.add_hypervisor(hypervisor) - id2 = str(uuid.uuid4()) + id2 = "{0}".format(uuid.uuid4()) self.assertRaises(exception.HypervisorNotFound, model.get_hypervisor_from_id, id2) def test_remove_hypervisor_raise(self): model = ModelRoot() - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id model.add_hypervisor(hypervisor) - id2 = str(uuid.uuid4()) + id2 = "{0}".format(uuid.uuid4()) hypervisor2 = Hypervisor() hypervisor2.uuid = id2 @@ -120,7 +120,7 @@ class TestModel(base.BaseTestCase): def test_assert_hypervisor_raise(self): model = ModelRoot() - id = str(uuid.uuid4()) + id = "{0}".format(uuid.uuid4()) hypervisor = Hypervisor() hypervisor.uuid = id model.add_hypervisor(hypervisor) @@ -128,7 +128,7 @@ class TestModel(base.BaseTestCase): model.assert_hypervisor, "objet_qcq") def test_vm_from_id_raise(self): - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_1() self.assertRaises(exception.VMNotFound, model.get_vm_from_id, "valeur_qcq") diff --git a/watcher/tests/decision_engine/framework/strategy/test_strategy_loader.py b/watcher/tests/decision_engine/framework/strategy/test_strategy_loader.py index c5a311112..159bd3e61 100644 --- a/watcher/tests/decision_engine/framework/strategy/test_strategy_loader.py +++ b/watcher/tests/decision_engine/framework/strategy/test_strategy_loader.py @@ -31,7 +31,7 @@ class TestStrategySelector(base.BaseTestCase): exptected_strategy = 'basic' selected_strategy = self.strategy_loader.load(exptected_strategy) self.assertEqual( - selected_strategy.get_name(), + selected_strategy.name, exptected_strategy, 'The default strategy should be basic') diff --git a/watcher/tests/decision_engine/framework/strategy/test_strategy_manager_impl.py b/watcher/tests/decision_engine/framework/strategy/test_strategy_manager_impl.py index dab06397b..34534899d 100644 --- a/watcher/tests/decision_engine/framework/strategy/test_strategy_manager_impl.py +++ b/watcher/tests/decision_engine/framework/strategy/test_strategy_manager_impl.py @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from watcher.decision_engine.framework.strategy.StrategyManagerImpl import \ - StrategyContextImpl +from watcher.decision_engine.framework.strategy.strategy_context import \ + StrategyContext from watcher.tests import base @@ -27,6 +27,6 @@ class FakeStrategy(object): class TestStrategyContextImpl(base.BaseTestCase): def test_add_remove_strategy(self): strategy = FakeStrategy() - strategy_context = StrategyContextImpl() + strategy_context = StrategyContext() strategy_context.add_strategy(strategy) strategy_context.remove_strategy(strategy) diff --git a/watcher/tests/decision_engine/framework/test_default_planner.py b/watcher/tests/decision_engine/framework/test_default_planner.py index 87ce98fba..f6f9f0158 100644 --- a/watcher/tests/decision_engine/framework/test_default_planner.py +++ b/watcher/tests/decision_engine/framework/test_default_planner.py @@ -15,16 +15,18 @@ # limitations under the License. import mock +from mock import MagicMock from watcher.common.exception import MetaActionNotFound from watcher.common import utils from watcher.db import api as db_api from watcher.decision_engine.framework.default_planner import DefaultPlanner from watcher.decision_engine.strategies.basic_consolidation import \ BasicConsolidation + from watcher.tests.db import base from watcher.tests.db import utils as db_utils from watcher.tests.decision_engine.faker_cluster_state import \ - FakerStateCollector + FakerModelCollector from watcher.tests.decision_engine.faker_metrics_collector import \ FakerMetricsCollector from watcher.tests.objects import utils as obj_utils @@ -34,9 +36,10 @@ class SolutionFaker(object): @staticmethod def build(): metrics = FakerMetricsCollector() - current_state_cluster = FakerStateCollector() + current_state_cluster = FakerModelCollector() sercon = BasicConsolidation("basic", "Basic offline consolidation") - sercon.set_metrics_resource_collector(metrics) + sercon.ceilometer = MagicMock( + get_statistics=metrics.mock_get_statistics) return sercon.execute(current_state_cluster.generate_scenario_1()) @@ -44,9 +47,11 @@ class SolutionFakerSingleHyp(object): @staticmethod def build(): metrics = FakerMetricsCollector() - current_state_cluster = FakerStateCollector() + current_state_cluster = FakerModelCollector() sercon = BasicConsolidation("basic", "Basic offline consolidation") - sercon.set_metrics_resource_collector(metrics) + sercon.ceilometer = MagicMock( + get_statistics=metrics.mock_get_statistics) + return sercon.execute( current_state_cluster.generate_scenario_4_with_2_hypervisors()) diff --git a/watcher/tests/decision_engine/framework/test_manager.py b/watcher/tests/decision_engine/framework/test_manager.py index 5ce4777f6..1ba4b9fad 100644 --- a/watcher/tests/decision_engine/framework/test_manager.py +++ b/watcher/tests/decision_engine/framework/test_manager.py @@ -22,7 +22,7 @@ from watcher.decision_engine.framework.events.event_consumer_factory import \ EventConsumerFactory from watcher.common.messaging.events.event import Event -from watcher.decision_engine.framework.manager_decision_engine import \ +from watcher.decision_engine.framework.manager import \ DecisionEngineManager from watcher.decision_engine.framework.messaging.events import Events diff --git a/watcher/tests/decision_engine/strategies/test_basic_consolidation.py b/watcher/tests/decision_engine/strategies/test_basic_consolidation.py index 45250ec85..6801ea21b 100644 --- a/watcher/tests/decision_engine/strategies/test_basic_consolidation.py +++ b/watcher/tests/decision_engine/strategies/test_basic_consolidation.py @@ -16,7 +16,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from collections import Counter + import mock +from mock import MagicMock + from watcher.common import exception from watcher.decision_engine.framework.meta_actions.hypervisor_state import \ @@ -30,11 +34,9 @@ from watcher.decision_engine.strategies.basic_consolidation import \ BasicConsolidation from watcher.tests import base from watcher.tests.decision_engine.faker_cluster_state import \ - FakerStateCollector + FakerModelCollector from watcher.tests.decision_engine.faker_metrics_collector import \ FakerMetricsCollector -# from watcher.tests.decision_engine.faker_metrics_collector import \ -# FakerMetricsCollectorEmptyType class TestBasicConsolidation(base.BaseTestCase): @@ -42,7 +44,7 @@ class TestBasicConsolidation(base.BaseTestCase): fake_metrics = FakerMetricsCollector() # fake cluster - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() def test_cluster_size(self): size_cluster = len( @@ -53,18 +55,20 @@ class TestBasicConsolidation(base.BaseTestCase): def test_basic_consolidation_score_hypervisor(self): cluster = self.fake_cluster.generate_scenario_1() sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(self.fake_metrics) - node_1_score = 0.01666666666666668 + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + + node_1_score = 0.023333333333333317 self.assertEqual( sercon.calculate_score_node( cluster.get_hypervisor_from_id("Node_1"), cluster), node_1_score) - node_2_score = 0.01666666666666668 + node_2_score = 0.26666666666666666 self.assertEqual( sercon.calculate_score_node( cluster.get_hypervisor_from_id("Node_2"), cluster), node_2_score) - node_0_score = 0.01666666666666668 + node_0_score = 0.023333333333333317 self.assertEqual( sercon.calculate_score_node( cluster.get_hypervisor_from_id("Node_0"), @@ -73,7 +77,8 @@ class TestBasicConsolidation(base.BaseTestCase): def test_basic_consolidation_score_vm(self): cluster = self.fake_cluster.generate_scenario_1() sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(self.fake_metrics) + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) vm_0 = cluster.get_vm_from_id("VM_0") vm_0_score = 0.0 self.assertEqual(sercon.calculate_score_vm(vm_0, cluster), vm_0_score) @@ -95,7 +100,8 @@ class TestBasicConsolidation(base.BaseTestCase): def test_basic_consolidation_score_vm_disk(self): cluster = self.fake_cluster.generate_scenario_5_with_vm_disk_0() sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(self.fake_metrics) + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) vm_0 = cluster.get_vm_from_id("VM_0") vm_0_score = 0.0 self.assertEqual(sercon.calculate_score_vm(vm_0, cluster), vm_0_score) @@ -103,7 +109,8 @@ class TestBasicConsolidation(base.BaseTestCase): def test_basic_consolidation_weight(self): cluster = self.fake_cluster.generate_scenario_1() sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(self.fake_metrics) + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) vm_0 = cluster.get_vm_from_id("VM_0") cores = 16 # 80 Go @@ -130,16 +137,13 @@ class TestBasicConsolidation(base.BaseTestCase): self.assertRaises(exception.ClusterEmpty, sercon.execute, model) - def test_calculate_score_vm_raise_metric_collector(self): - sercon = BasicConsolidation() - self.assertRaises(exception.MetricCollectorNotDefined, - sercon.calculate_score_vm, "VM_1", None) - def test_calculate_score_vm_raise_cluster_state_not_found(self): metrics = FakerMetricsCollector() metrics.empty_one_metric("CPU_COMPUTE") sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(metrics) + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + self.assertRaises(exception.ClusteStateNotDefined, sercon.calculate_score_vm, "VM_1", None) @@ -168,7 +172,7 @@ class TestBasicConsolidation(base.BaseTestCase): def test_check_migration(self): sercon = BasicConsolidation() - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() all_vms = model.get_all_vms() @@ -180,7 +184,7 @@ class TestBasicConsolidation(base.BaseTestCase): def test_threshold(self): sercon = BasicConsolidation() - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() all_hyps = model.get_all_hypervisors() @@ -197,122 +201,48 @@ class TestBasicConsolidation(base.BaseTestCase): sercon.get_number_of_released_nodes() sercon.get_number_of_migrations() - def test_calculate_score_node_raise_1(self): - sercon = BasicConsolidation() - metrics = FakerStateCollector() - - model = metrics.generate_scenario_4_with_2_hypervisors() - all_hyps = model.get_all_hypervisors() - hyp0 = all_hyps[all_hyps.keys()[0]] - - self.assertRaises(exception.MetricCollectorNotDefined, - sercon.calculate_score_node, hyp0, model) - - def test_calculate_score_node_raise_cpu_compute(self): - metrics = FakerMetricsCollector() - metrics.empty_one_metric("CPU_COMPUTE") - sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(metrics) - current_state_cluster = FakerStateCollector() - model = current_state_cluster.generate_scenario_4_with_2_hypervisors() - - all_hyps = model.get_all_hypervisors() - hyp0 = all_hyps[all_hyps.keys()[0]] - - self.assertRaises(exception.NoDataFound, - sercon.calculate_score_node, hyp0, model) - - """ - def test_calculate_score_node_raise_memory_compute(self): - metrics = FakerMetricsCollector() - metrics.empty_one_metric("MEM_COMPUTE") - sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(metrics) - current_state_cluster = FakerStateCollector() - model = current_state_cluster.generate_scenario_4_with_2_hypervisors() - - all_hyps = model.get_all_hypervisors() - hyp0 = all_hyps[all_hyps.keys()[0]] - self.assertRaises(exception.NoDataFound, - sercon.calculate_score_node, hyp0, model) - - def test_calculate_score_node_raise_disk_compute(self): - metrics = FakerMetricsCollector() - metrics.empty_one_metric("DISK_COMPUTE") - sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(metrics) - current_state_cluster = FakerStateCollector() - model = current_state_cluster.generate_scenario_4_with_2_hypervisors() - - all_hyps = model.get_all_hypervisors() - hyp0 = all_hyps[all_hyps.keys()[0]] - - self.assertRaises(exception.NoDataFound, - sercon.calculate_score_node, hyp0, model) - """ - def test_basic_consolidation_migration(self): sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(FakerMetricsCollector()) - solution = None - solution = sercon.execute(FakerStateCollector().generate_scenario_1()) + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) - count_migration = 0 - change_hypervisor_state = 0 - change_power_state = 0 - migrate = [] - for action in solution.meta_actions: - if isinstance(action, Migrate): - count_migration += 1 - migrate.append(action) - if isinstance(action, ChangeHypervisorState): - change_hypervisor_state += 1 - if isinstance(action, ChangePowerState): - change_power_state += 1 + solution = sercon.execute( + self.fake_cluster.generate_scenario_3()) - # self.assertEqual(change_hypervisor_state, 1) - # self.assertEqual(count_migration, 2) + actions_counter = Counter( + [type(action) for action in solution.meta_actions]) + + expected_num_migrations = 0 + expected_power_state = 0 + expected_change_hypervisor_state = 0 + + num_migrations = actions_counter.get(Migrate, 0) + num_hypervisor_state_change = actions_counter.get( + ChangeHypervisorState, 0) + num_power_state_change = actions_counter.get( + ChangePowerState, 0) + + self.assertEqual(num_migrations, expected_num_migrations) + self.assertEqual(num_hypervisor_state_change, expected_power_state) + self.assertEqual(num_power_state_change, + expected_change_hypervisor_state) def test_execute_cluster_empty(self): - metrics = FakerMetricsCollector() - current_state_cluster = FakerStateCollector() - + current_state_cluster = FakerModelCollector() sercon = BasicConsolidation("sercon", "Basic offline consolidation") - sercon.set_metrics_resource_collector(metrics) + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) model = current_state_cluster.generate_random(0, 0) self.assertRaises(exception.ClusterEmpty, sercon.execute, model) - def test_basic_consolidation_random(self): - metrics = FakerMetricsCollector() - current_state_cluster = FakerStateCollector() - - sercon = BasicConsolidation("sercon", "Basic offline consolidation") - sercon.set_metrics_resource_collector(metrics) - - solution = sercon.execute( - current_state_cluster.generate_random(25, 2)) - solution.__str__() - - count_migration = 0 - change_hypervisor_state = 0 - change_power_state = 0 - migrate = [] - for action in solution.meta_actions: - if isinstance(action, Migrate): - count_migration += 1 - migrate.append(action) - if isinstance(action, ChangeHypervisorState): - change_hypervisor_state += 1 - if isinstance(action, ChangePowerState): - change_power_state += 1 - # calculate_weight def test_execute_no_workload(self): - metrics = FakerMetricsCollector() sercon = BasicConsolidation() - sercon.set_metrics_resource_collector(metrics) - current_state_cluster = FakerStateCollector() - model = current_state_cluster.\ + sercon.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + + current_state_cluster = FakerModelCollector() + model = current_state_cluster. \ generate_scenario_5_with_1_hypervisor_no_vm() with mock.patch.object(BasicConsolidation, 'calculate_weight') \ diff --git a/watcher/tests/decision_engine/strategies/test_dummy_strategy.py b/watcher/tests/decision_engine/strategies/test_dummy_strategy.py index c94a3aa52..8ae444449 100644 --- a/watcher/tests/decision_engine/strategies/test_dummy_strategy.py +++ b/watcher/tests/decision_engine/strategies/test_dummy_strategy.py @@ -16,12 +16,12 @@ from watcher.decision_engine.strategies.dummy_strategy import DummyStrategy from watcher.tests import base from watcher.tests.decision_engine.faker_cluster_state import \ - FakerStateCollector + FakerModelCollector class TestDummyStrategy(base.TestCase): def test_dummy_strategy(self): tactique = DummyStrategy("basic", "Basic offline consolidation") - fake_cluster = FakerStateCollector() + fake_cluster = FakerModelCollector() model = fake_cluster.generate_scenario_4_with_2_hypervisors() tactique.execute(model) diff --git a/watcher/tests/decision_engine/test_planner.py b/watcher/tests/decision_engine/test_planner.py deleted file mode 100644 index 69fd53513..000000000 --- a/watcher/tests/decision_engine/test_planner.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from watcher.tests import base - - -class TestPlanner(base.BaseTestCase): - def test_planner(self): - pass