Integration of Ceilometer in Watcher
In current implementation is not easy to use ceilometer. Watcher must query metrics from the Telemetry v2 API to allow an easiest integration with OpenStack components (especially devstack). blueprint telemetry-integration Change-Id: Ide515472f1d160925d9f4aabf48c96dea4f6bc05
This commit is contained in:
@@ -46,7 +46,7 @@ This component pushes the new predicted metrics to the CEP in order to trigger n
|
||||
|
||||
## Watcher Cluster State Collector
|
||||
|
||||
This module of the Decision Engine provides a high level API for requesting status information from the InfluxDb database.
|
||||
This module of the Decision Engine provides a high level API for requesting status information from the Nova API.
|
||||
|
||||
A DSL will be provided in order to ease the development of new optimization strategies.
|
||||
|
||||
@@ -55,14 +55,6 @@ Example of high level requests that may be provided :
|
||||
* get the state evolution in time of a group of instances from 9 AM to 10 AM for every day of the week
|
||||
* ...
|
||||
|
||||
## Watcher Resource Metrics Collector
|
||||
|
||||
This module of the Decision Engine provides a high level API for requesting metrics information from the InfluxDb database.
|
||||
|
||||
A DSL will be provided in order to ease the development of new optimization strategies.
|
||||
|
||||
This component is distinct from the Cluster State Collector because it will probably have to deal with a much more important set of data and it may need a specific DSL for applying mathematical computes on metrics (min, max, average, ...).
|
||||
|
||||
|
||||
## Watcher Actions Planner
|
||||
|
||||
|
||||
@@ -16,8 +16,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
class DecisionEngineCommand(object):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseDecisionEngineCommand(object):
|
||||
@abc.abstractmethod
|
||||
def execute(self):
|
||||
raise NotImplementedError("Should have implemented this")
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@@ -16,15 +16,24 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
class EventConsumer(object):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseEventConsumer(object):
|
||||
|
||||
def __init__(self):
|
||||
self.messaging = None
|
||||
self._messaging = None
|
||||
|
||||
def set_messaging(self, messaging):
|
||||
self.messaging = messaging
|
||||
@property
|
||||
def messaging(self):
|
||||
return self._messaging
|
||||
|
||||
@messaging.setter
|
||||
def messaging(self, e):
|
||||
self._messaging = e
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute(self, request_id, context, data):
|
||||
raise NotImplementedError('Not implemented ...')
|
||||
raise NotImplementedError('Not implemented ...') # pragma:no cover
|
||||
|
||||
@@ -16,9 +16,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Planner(object):
|
||||
@abc.abstractmethod
|
||||
def schedule(self, context, audit_uuid, solution):
|
||||
"""The planner receives a solution to schedule
|
||||
|
||||
@@ -29,4 +33,5 @@ class Planner(object):
|
||||
and performance requirements are met.
|
||||
"""
|
||||
# example: directed acyclic graph
|
||||
raise NotImplementedError("Should have implemented this")
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@@ -16,22 +16,47 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Solution(object):
|
||||
def __init__(self):
|
||||
self.modelOrigin = None
|
||||
self.currentModel = None
|
||||
self.efficiency = 0
|
||||
self._origin = None
|
||||
self._model = None
|
||||
self._efficiency = 0
|
||||
|
||||
def get_efficiency(self):
|
||||
return self.efficiency
|
||||
@property
|
||||
def efficiency(self):
|
||||
return self._efficiency
|
||||
|
||||
def set_efficiency(self, efficiency):
|
||||
self.efficiency = efficiency
|
||||
@efficiency.setter
|
||||
def efficiency(self, e):
|
||||
self._efficiency = e
|
||||
|
||||
def set_model(self, current_model):
|
||||
self.currentModel = current_model
|
||||
@property
|
||||
def model(self):
|
||||
return self._model
|
||||
|
||||
def get_model(self):
|
||||
return self.currentModel
|
||||
@model.setter
|
||||
def model(self, m):
|
||||
self._model = m
|
||||
|
||||
@property
|
||||
def origin(self):
|
||||
return self._origin
|
||||
|
||||
@origin.setter
|
||||
def origin(self, m):
|
||||
self._origin = m
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_change_request(self, r):
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@abc.abstractproperty
|
||||
def meta_actions(self):
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@@ -16,8 +16,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Solution(object):
|
||||
@abc.abstractmethod
|
||||
def compare(self, sol1, sol2):
|
||||
raise NotImplementedError("Should have implemented this")
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@@ -16,8 +16,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class SolutionEvaluator(object):
|
||||
@abc.abstractmethod
|
||||
def evaluate(self, solution):
|
||||
raise NotImplementedError("Should have implemented this")
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@@ -16,26 +16,33 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
from watcher.decision_engine.api.strategy.strategy import StrategyLevel
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class MetaAction(object):
|
||||
def __init__(self):
|
||||
self.level = StrategyLevel.conservative
|
||||
self.priority = 0
|
||||
self._level = StrategyLevel.conservative
|
||||
self._priority = 0
|
||||
|
||||
def get_level(self):
|
||||
return self.level
|
||||
@property
|
||||
def level(self):
|
||||
return self._level
|
||||
|
||||
def set_level(self, level):
|
||||
self.level = level
|
||||
@level.setter
|
||||
def level(self, l):
|
||||
self._level = l
|
||||
|
||||
def set_priority(self, priority):
|
||||
self.priority = priority
|
||||
@property
|
||||
def priority(self):
|
||||
return self._priority
|
||||
|
||||
def get_priority(self):
|
||||
return self.priority
|
||||
@priority.setter
|
||||
def priority(self, p):
|
||||
self._priority = p
|
||||
|
||||
def __str__(self):
|
||||
return " "
|
||||
|
||||
@@ -16,8 +16,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Selector(object):
|
||||
@abc.abstractmethod
|
||||
def define_from_goal(self, goal_name):
|
||||
raise NotImplementedError("Should have implemented this")
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@@ -28,34 +28,13 @@ LOG = log.getLogger(__name__)
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Strategy(object):
|
||||
def __init__(self, name=None, description=None):
|
||||
self.name = name
|
||||
self._name = name
|
||||
self.description = description
|
||||
# default strategy level
|
||||
self.strategy_level = StrategyLevel.conservative
|
||||
self.metrics_collector = None
|
||||
self.cluster_state_collector = None
|
||||
self._strategy_level = StrategyLevel.conservative
|
||||
self._cluster_state_collector = None
|
||||
# the solution given by the strategy
|
||||
self.solution = DefaultSolution()
|
||||
|
||||
def get_solution(self):
|
||||
return self.solution
|
||||
|
||||
def set_name(self, name):
|
||||
self.name = name
|
||||
|
||||
def get_name(self):
|
||||
return self.name
|
||||
|
||||
def get_strategy_strategy_level(self):
|
||||
return self.strategy_level
|
||||
|
||||
def set_strategy_strategy_level(self, strategy_level):
|
||||
"""Convervative to Aggressive
|
||||
|
||||
the aims is to minimize le number of migrations
|
||||
:param threshold:
|
||||
"""
|
||||
self.strategy_level = strategy_level
|
||||
self._solution = DefaultSolution()
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute(self, model):
|
||||
@@ -65,14 +44,34 @@ class Strategy(object):
|
||||
:return:
|
||||
"""
|
||||
|
||||
def get_metrics_resource_collector(self):
|
||||
return self.metrics_collector
|
||||
@property
|
||||
def solution(self):
|
||||
return self._solution
|
||||
|
||||
def get_cluster_state_collector(self):
|
||||
return self.cluster_state_collector
|
||||
@solution.setter
|
||||
def solution(self, s):
|
||||
self._solution = s
|
||||
|
||||
def set_metrics_resource_collector(self, metrics_collector):
|
||||
self.metrics_collector = metrics_collector
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
def set_cluster_state_collector(self, cluster_state_collector):
|
||||
self.cluster_state_collector = cluster_state_collector
|
||||
@name.setter
|
||||
def name(self, n):
|
||||
self._name = n
|
||||
|
||||
@property
|
||||
def strategy_level(self):
|
||||
return self._strategy_level
|
||||
|
||||
@strategy_level.setter
|
||||
def strategy_level(self, s):
|
||||
self._strategy_level = s
|
||||
|
||||
@property
|
||||
def state_collector(self):
|
||||
return self._cluster_state_collector
|
||||
|
||||
@state_collector.setter
|
||||
def state_collector(self, s):
|
||||
self._cluster_state_collector = s
|
||||
|
||||
@@ -16,11 +16,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
class StrategyContext(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class BaseStrategyContext(object):
|
||||
@abc.abstractmethod
|
||||
def execute_strategy(self, model):
|
||||
raise NotImplementedError("Should have implemented this")
|
||||
raise NotImplementedError(
|
||||
"Should have implemented this") # pragma:no cover
|
||||
|
||||
@@ -17,11 +17,11 @@ from oslo_log import log
|
||||
|
||||
from watcher.common.messaging.events.event import Event
|
||||
from watcher.decision_engine.api.messaging.decision_engine_command import \
|
||||
DecisionEngineCommand
|
||||
BaseDecisionEngineCommand
|
||||
from watcher.decision_engine.framework.default_planner import DefaultPlanner
|
||||
from watcher.decision_engine.framework.messaging.events import Events
|
||||
from watcher.decision_engine.framework.strategy.StrategyManagerImpl import \
|
||||
StrategyContextImpl
|
||||
from watcher.decision_engine.framework.strategy.strategy_context import \
|
||||
StrategyContext
|
||||
from watcher.objects.audit import Audit
|
||||
from watcher.objects.audit import AuditStatus
|
||||
from watcher.objects.audit_template import AuditTemplate
|
||||
@@ -29,12 +29,11 @@ from watcher.objects.audit_template import AuditTemplate
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerAuditCommand(DecisionEngineCommand):
|
||||
def __init__(self, messaging, statedb, ressourcedb):
|
||||
class TriggerAuditCommand(BaseDecisionEngineCommand):
|
||||
def __init__(self, messaging, model_collector):
|
||||
self.messaging = messaging
|
||||
self.statedb = statedb
|
||||
self.ressourcedb = ressourcedb
|
||||
self.strategy_context = StrategyContextImpl()
|
||||
self.model_collector = model_collector
|
||||
self.strategy_context = StrategyContext()
|
||||
|
||||
def notify(self, audit_uuid, event_type, status):
|
||||
event = Event()
|
||||
@@ -46,7 +45,7 @@ class TriggerAuditCommand(DecisionEngineCommand):
|
||||
payload)
|
||||
|
||||
def update_audit(self, request_context, audit_uuid, state):
|
||||
LOG.debug("update audit " + str(state))
|
||||
LOG.debug("update audit {0} ".format(state))
|
||||
audit = Audit.get_by_uuid(request_context, audit_uuid)
|
||||
audit.state = state
|
||||
audit.save()
|
||||
@@ -61,16 +60,14 @@ class TriggerAuditCommand(DecisionEngineCommand):
|
||||
audit = self.update_audit(request_context, audit_uuid,
|
||||
AuditStatus.ONGOING)
|
||||
|
||||
# 3 - Retrieve metrics
|
||||
cluster = self.statedb.get_latest_state_cluster()
|
||||
# 3 - Retrieve cluster-data-model
|
||||
cluster = self.model_collector.get_latest_cluster_data_model()
|
||||
|
||||
# 4 - Select appropriate strategy
|
||||
audit_template = AuditTemplate.get_by_id(request_context,
|
||||
audit.audit_template_id)
|
||||
|
||||
self.strategy_context.set_goal(audit_template.goal)
|
||||
self.strategy_context.set_metrics_resource_collector(
|
||||
self.ressourcedb)
|
||||
|
||||
# 5 - compute change requests
|
||||
solution = self.strategy_context.execute_strategy(cluster)
|
||||
@@ -83,4 +80,4 @@ class TriggerAuditCommand(DecisionEngineCommand):
|
||||
self.update_audit(request_context, audit_uuid, AuditStatus.SUCCESS)
|
||||
except Exception as e:
|
||||
self.update_audit(request_context, audit_uuid, AuditStatus.FAILED)
|
||||
LOG.error(" " + unicode(e))
|
||||
LOG.error("Execute audit command {0} ".format(unicode(e)))
|
||||
|
||||
@@ -28,6 +28,7 @@ from watcher import objects
|
||||
from watcher.decision_engine.framework.meta_actions.hypervisor_state import \
|
||||
ChangeHypervisorState
|
||||
from watcher.decision_engine.framework.meta_actions.migrate import Migrate
|
||||
from watcher.decision_engine.framework.meta_actions.nop import Nop
|
||||
from watcher.decision_engine.framework.meta_actions.power_state import \
|
||||
ChangePowerState
|
||||
from watcher.objects.action import Status as AStatus
|
||||
@@ -35,6 +36,7 @@ from watcher.objects.action_plan import Status as APStatus
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
# TODO(jed) The default planner is a very simple planner
|
||||
# https://wiki.openstack.org/wiki/NovaOrchestration/WorkflowEngines
|
||||
|
||||
@@ -95,7 +97,8 @@ class DefaultPlanner(Planner):
|
||||
uuid,
|
||||
action.get_dest_hypervisor().
|
||||
uuid,
|
||||
description=str(action)
|
||||
description="{0}".format(
|
||||
action)
|
||||
)
|
||||
|
||||
elif isinstance(action, ChangePowerState):
|
||||
@@ -105,7 +108,9 @@ class DefaultPlanner(Planner):
|
||||
applies_to=action.target.uuid,
|
||||
parameter=action.
|
||||
powerstate.
|
||||
value, description=str(action))
|
||||
value,
|
||||
description="{0}".format(
|
||||
action))
|
||||
elif isinstance(action, ChangeHypervisorState):
|
||||
primitive = self.create_action(action_plan_id=action_plan.id,
|
||||
action_type=Primitives.
|
||||
@@ -113,8 +118,14 @@ class DefaultPlanner(Planner):
|
||||
applies_to=action.target.uuid,
|
||||
parameter=action.state.
|
||||
value,
|
||||
description=str(action))
|
||||
|
||||
description="{0}".format(
|
||||
action))
|
||||
elif isinstance(action, Nop):
|
||||
primitive = self.create_action(action_plan_id=action_plan.id,
|
||||
action_type=Primitives.
|
||||
NOP.value,
|
||||
description="{0}".format(
|
||||
action))
|
||||
else:
|
||||
raise MetaActionNotFound()
|
||||
priority = priority_primitives[primitive['action_type']]
|
||||
|
||||
@@ -32,8 +32,8 @@ from watcher.decision_engine.framework.messaging.events import Events
|
||||
|
||||
from watcher.common.messaging.notification_handler import \
|
||||
NotificationHandler
|
||||
from watcher.decision_engine.framework.strategy.StrategyManagerImpl import \
|
||||
StrategyContextImpl
|
||||
from watcher.decision_engine.framework.strategy.strategy_context import \
|
||||
StrategyContext
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
@@ -76,7 +76,7 @@ class DecisionEngineManager(MessagingCore):
|
||||
# todo(jed) oslo_conf
|
||||
self.executor = ThreadPoolExecutor(max_workers=2)
|
||||
self.topic_control.add_endpoint(AuditEndpoint(self))
|
||||
self.context = StrategyContextImpl(self)
|
||||
self.context = StrategyContext(self)
|
||||
|
||||
def join(self):
|
||||
self.topic_control.join()
|
||||
@@ -93,7 +93,7 @@ class DecisionEngineManager(MessagingCore):
|
||||
LOG.debug("data => %s" % str(data))
|
||||
|
||||
event_consumer = EventConsumerFactory().factory(event_type)
|
||||
event_consumer.set_messaging(self)
|
||||
event_consumer.messaging = self
|
||||
event_consumer.execute(request_id, data)
|
||||
except Exception as e:
|
||||
LOG.error("evt %s" % e.message)
|
||||
@@ -20,7 +20,8 @@ from oslo_log import log
|
||||
|
||||
from watcher.decision_engine.framework.command.trigger_audit_command import \
|
||||
TriggerAuditCommand
|
||||
from watcher.metrics_engine.framework.collector_manager import CollectorManager
|
||||
from watcher.metrics_engine.cluster_model_collector.manager import \
|
||||
CollectorManager
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@@ -31,11 +32,9 @@ class AuditEndpoint(object):
|
||||
self.manager = CollectorManager()
|
||||
|
||||
def do_trigger_audit(self, context, audit_uuid):
|
||||
statedb = self.manager.get_statedb_collector()
|
||||
ressourcedb = self.manager.get_metric_collector()
|
||||
model_collector = self.manager.get_cluster_model_collector()
|
||||
|
||||
audit = TriggerAuditCommand(self.de, statedb,
|
||||
ressourcedb)
|
||||
audit = TriggerAuditCommand(self.de, model_collector)
|
||||
audit.execute(audit_uuid, context)
|
||||
|
||||
def trigger_audit(self, context, audit_uuid):
|
||||
|
||||
@@ -49,5 +49,5 @@ class ChangeHypervisorState(MetaAction):
|
||||
self._target = p
|
||||
|
||||
def __str__(self):
|
||||
return MetaAction.__str__(self) + " ChangeHypervisorState" + str(
|
||||
self.target) + " =>" + str(self.state)
|
||||
return "{0} {1} ChangeHypervisorState => {2}".format(
|
||||
MetaAction.__str__(self), self.target, self.state)
|
||||
|
||||
@@ -70,6 +70,7 @@ class Migrate(MetaAction):
|
||||
return self.dest_hypervisor
|
||||
|
||||
def __str__(self):
|
||||
return MetaAction.__str__(self) + " Migrate " + str(
|
||||
self.vm) + " from " + str(
|
||||
self.source_hypervisor) + " to " + str(self.dest_hypervisor)
|
||||
return "{0} Migrate {1} from {2} to {3}".format(
|
||||
MetaAction.__str__(self), self.vm,
|
||||
self.source_hypervisor,
|
||||
self.dest_hypervisor)
|
||||
|
||||
24
watcher/decision_engine/framework/meta_actions/nop.py
Normal file
24
watcher/decision_engine/framework/meta_actions/nop.py
Normal file
@@ -0,0 +1,24 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from watcher.decision_engine.api.strategy.meta_action import MetaAction
|
||||
|
||||
|
||||
class Nop(MetaAction):
|
||||
def __str__(self):
|
||||
return "{0} Nop".format(MetaAction.__str__(self))
|
||||
@@ -48,5 +48,6 @@ class ChangePowerState(MetaAction):
|
||||
self._target = p
|
||||
|
||||
def __str__(self):
|
||||
return MetaAction.__str__(self) + "ChangePowerState " + str(
|
||||
self.target) + " => " + str(self.powerstate)
|
||||
return "{0} ChangePowerState {1} => {2} ".format(
|
||||
MetaAction.__str__(self),
|
||||
self.target, self.powerstate)
|
||||
|
||||
@@ -69,9 +69,10 @@ class Mapping(object):
|
||||
# remove vm
|
||||
self.mapping_vm.pop(vm_uuid)
|
||||
else:
|
||||
LOG.warn("trying to delete the virtual machine " + str(
|
||||
vm_uuid) + " but it was not found on hypervisor" + str(
|
||||
node_uuid))
|
||||
LOG.warn(
|
||||
"trying to delete the virtual machine {0} but it was not "
|
||||
"found on hypervisor {1}".format(
|
||||
vm_uuid, node_uuid))
|
||||
finally:
|
||||
self.lock.release()
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ class ModelRoot(object):
|
||||
return self.mapping
|
||||
|
||||
def create_resource(self, r):
|
||||
self.resource[str(r.get_name())] = r
|
||||
self.resource[str(r.name)] = r
|
||||
|
||||
def get_resource_from_id(self, id):
|
||||
return self.resource[str(id)]
|
||||
|
||||
@@ -38,4 +38,4 @@ class NamedElement(object):
|
||||
self._human_id = h
|
||||
|
||||
def __str__(self):
|
||||
return "[" + str(self.uuid) + "]"
|
||||
return "[{0}]".format(self.uuid)
|
||||
|
||||
@@ -31,12 +31,17 @@ class Resource(object):
|
||||
:param capacity: max
|
||||
:return:
|
||||
"""
|
||||
self.name = name
|
||||
self._name = name
|
||||
self.capacity = capacity
|
||||
self.mapping = {}
|
||||
|
||||
def get_name(self):
|
||||
return self.name
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
@name.setter
|
||||
def name(self, n):
|
||||
self._name = n
|
||||
|
||||
def set_capacity(self, element, value):
|
||||
self.mapping[element.uuid] = value
|
||||
|
||||
@@ -32,9 +32,9 @@ from watcher.common.messaging.utils.transport_url_builder import \
|
||||
TransportUrlBuilder
|
||||
from watcher.decision_engine.framework.events.event_consumer_factory import \
|
||||
EventConsumerFactory
|
||||
from watcher.decision_engine.framework.manager_decision_engine import \
|
||||
from watcher.decision_engine.framework.manager import \
|
||||
decision_engine_opt_group
|
||||
from watcher.decision_engine.framework.manager_decision_engine import \
|
||||
from watcher.decision_engine.framework.manager import \
|
||||
WATCHER_DECISION_ENGINE_OPTS
|
||||
|
||||
from watcher.decision_engine.framework.messaging.events import Events
|
||||
|
||||
@@ -15,8 +15,8 @@
|
||||
# limitations under the License.
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.decision_engine.api.strategy.strategy_context import \
|
||||
StrategyContext
|
||||
from watcher.decision_engine.api.strategy.strategy_context import\
|
||||
BaseStrategyContext
|
||||
from watcher.decision_engine.framework.default_planner import DefaultPlanner
|
||||
from watcher.decision_engine.framework.strategy.strategy_selector import \
|
||||
StrategySelector
|
||||
@@ -24,7 +24,7 @@ from watcher.decision_engine.framework.strategy.strategy_selector import \
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class StrategyContextImpl(StrategyContext):
|
||||
class StrategyContext(BaseStrategyContext):
|
||||
def __init__(self, broker=None):
|
||||
LOG.debug("Initializing decision_engine Engine API ")
|
||||
self.strategies = {}
|
||||
@@ -33,7 +33,6 @@ class StrategyContextImpl(StrategyContext):
|
||||
self.planner = DefaultPlanner()
|
||||
self.strategy_selector = StrategySelector()
|
||||
self.goal = None
|
||||
self.metrics_resource_collector = None
|
||||
|
||||
def add_strategy(self, strategy):
|
||||
self.strategies[strategy.name] = strategy
|
||||
@@ -45,12 +44,7 @@ class StrategyContextImpl(StrategyContext):
|
||||
def set_goal(self, goal):
|
||||
self.goal = goal
|
||||
|
||||
def set_metrics_resource_collector(self, metrics_resource_collector):
|
||||
self.metrics_resource_collector = metrics_resource_collector
|
||||
|
||||
def execute_strategy(self, model):
|
||||
# todo(jed) create thread + refactoring
|
||||
selected_strategy = self.strategy_selector.define_from_goal(self.goal)
|
||||
selected_strategy.set_metrics_resource_collector(
|
||||
self.metrics_resource_collector)
|
||||
return selected_strategy.execute(model)
|
||||
@@ -18,16 +18,11 @@
|
||||
#
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.decision_engine.framework.model.vm_state import VMState
|
||||
from watcher.metrics_engine.api.metrics_resource_collector import \
|
||||
AggregationFunction
|
||||
|
||||
from watcher.common.exception import ClusterEmpty
|
||||
from watcher.common.exception import ClusteStateNotDefined
|
||||
from watcher.common.exception import MetricCollectorNotDefined
|
||||
from watcher.common.exception import NoDataFound
|
||||
from watcher.decision_engine.api.strategy.strategy import Strategy
|
||||
from watcher.decision_engine.api.strategy.strategy import StrategyLevel
|
||||
|
||||
from watcher.decision_engine.framework.meta_actions.hypervisor_state import \
|
||||
ChangeHypervisorState
|
||||
from watcher.decision_engine.framework.meta_actions.migrate import Migrate
|
||||
@@ -40,6 +35,9 @@ from watcher.decision_engine.framework.meta_actions.power_state import \
|
||||
from watcher.decision_engine.framework.model.hypervisor_state import \
|
||||
HypervisorState
|
||||
from watcher.decision_engine.framework.model.resource import ResourceType
|
||||
from watcher.decision_engine.framework.model.vm_state import VMState
|
||||
from watcher.metrics_engine.cluster_history.ceilometer import \
|
||||
CeilometerClusterHistory
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@@ -73,7 +71,7 @@ class BasicConsolidation(Strategy):
|
||||
:param name: the name of the strategy
|
||||
:param description: a description of the strategy
|
||||
"""
|
||||
Strategy.__init__(self, name, description)
|
||||
super(BasicConsolidation, self).__init__(name, description)
|
||||
|
||||
# set default value for the number of released nodes
|
||||
self.number_of_released_nodes = 0
|
||||
@@ -85,6 +83,8 @@ class BasicConsolidation(Strategy):
|
||||
# set default value for the efficiency
|
||||
self.efficiency = 100
|
||||
|
||||
self._ceilometer = None
|
||||
|
||||
# TODO(jed) improve threshold overbooking ?,...
|
||||
self.threshold_mem = 1
|
||||
self.threshold_disk = 1
|
||||
@@ -101,6 +101,16 @@ class BasicConsolidation(Strategy):
|
||||
# TODO(jed) bound migration attempts (80 %)
|
||||
self.bound_migration = 0.80
|
||||
|
||||
@property
|
||||
def ceilometer(self):
|
||||
if self._ceilometer is None:
|
||||
self._ceilometer = CeilometerClusterHistory()
|
||||
return self._ceilometer
|
||||
|
||||
@ceilometer.setter
|
||||
def ceilometer(self, c):
|
||||
self._ceilometer = c
|
||||
|
||||
def compute_attempts(self, size_cluster):
|
||||
"""Upper bound of the number of migration
|
||||
|
||||
@@ -123,10 +133,10 @@ class BasicConsolidation(Strategy):
|
||||
if src_hypervisor == dest_hypervisor:
|
||||
return False
|
||||
|
||||
LOG.debug('Migrate VM %s from %s to %s ',
|
||||
str(src_hypervisor),
|
||||
str(dest_hypervisor),
|
||||
str(vm_to_mig))
|
||||
LOG.debug('Migrate VM {0} from {1} to {2} '.format(vm_to_mig,
|
||||
src_hypervisor,
|
||||
dest_hypervisor,
|
||||
))
|
||||
|
||||
total_cores = 0
|
||||
total_disk = 0
|
||||
@@ -175,12 +185,6 @@ class BasicConsolidation(Strategy):
|
||||
cores_available = cap_cores.get_capacity(dest_hypervisor)
|
||||
disk_available = cap_disk.get_capacity(dest_hypervisor)
|
||||
mem_available = cap_mem.get_capacity(dest_hypervisor)
|
||||
LOG.debug("VCPU %s/%s ", str(total_cores * self.threshold_cores),
|
||||
str(cores_available), )
|
||||
LOG.debug("DISK %s/%s ", str(total_disk * self.threshold_disk),
|
||||
str(disk_available), )
|
||||
LOG.debug("MEM %s/%s ", str(total_mem * self.threshold_mem),
|
||||
str(mem_available))
|
||||
|
||||
if cores_available >= total_cores * self.threshold_cores \
|
||||
and disk_available >= total_disk * self.threshold_disk \
|
||||
@@ -213,7 +217,6 @@ class BasicConsolidation(Strategy):
|
||||
def calculate_weight(self, model, element, total_cores_used,
|
||||
total_disk_used, total_memory_used):
|
||||
"""Calculate weight of every
|
||||
|
||||
:param model:
|
||||
:param element:
|
||||
:param total_cores_used:
|
||||
@@ -253,24 +256,22 @@ class BasicConsolidation(Strategy):
|
||||
:param model:
|
||||
:return:
|
||||
"""
|
||||
metrics_collector = self.get_metrics_resource_collector()
|
||||
if metrics_collector is None:
|
||||
raise MetricCollectorNotDefined()
|
||||
cpu_avg_vm = self.ceilometer. \
|
||||
statistic_aggregation(resource_id=hypervisor.uuid,
|
||||
meter_name='compute.node.cpu.percent',
|
||||
period="7200",
|
||||
aggregate='avg'
|
||||
)
|
||||
if cpu_avg_vm is None:
|
||||
LOG.error(
|
||||
"No values returned for {0} compute.node.cpu.percent".format(
|
||||
hypervisor.uuid))
|
||||
cpu_avg_vm = 100
|
||||
|
||||
cpu_compute_mean_16h = metrics_collector.get_measurement(
|
||||
metric='compute_cpu_user_percent_gauge',
|
||||
aggregation_function=AggregationFunction.MEAN,
|
||||
start_time="16 hours before now",
|
||||
filters=["resource_id=" + hypervisor.uuid + ""])
|
||||
if len(cpu_compute_mean_16h) > 0:
|
||||
cpu_capacity = model.get_resource_from_id(
|
||||
ResourceType.cpu_cores).get_capacity(hypervisor)
|
||||
cpu_utilization = float(cpu_compute_mean_16h[0].value)
|
||||
total_cores_used = cpu_capacity * (cpu_utilization / 100)
|
||||
else:
|
||||
raise NoDataFound(
|
||||
"No values returned for " + str(hypervisor.uuid) +
|
||||
" compute_cpu_percent_gauge")
|
||||
cpu_capacity = model.get_resource_from_id(
|
||||
ResourceType.cpu_cores).get_capacity(hypervisor)
|
||||
|
||||
total_cores_used = cpu_capacity * (cpu_avg_vm / 100)
|
||||
|
||||
return self.calculate_weight(model, hypervisor, total_cores_used,
|
||||
0,
|
||||
@@ -295,29 +296,30 @@ class BasicConsolidation(Strategy):
|
||||
:param model: the model
|
||||
:return: score
|
||||
"""
|
||||
# todo(jed) inject ressource metric
|
||||
metric_collector = self.get_metrics_resource_collector()
|
||||
if metric_collector is None:
|
||||
raise MetricCollectorNotDefined()
|
||||
|
||||
if model is None:
|
||||
raise ClusteStateNotDefined()
|
||||
|
||||
vm = model.get_vm_from_id(vm.uuid)
|
||||
instance_cpu_mean_16 = metric_collector.get_measurement(
|
||||
metric='instance_cpu_percent_gauge',
|
||||
aggregation_function=AggregationFunction.MEAN,
|
||||
start_time="16 hours before now",
|
||||
filters=["resource_id=" + vm.uuid + ""])
|
||||
|
||||
if len(instance_cpu_mean_16) > 0:
|
||||
cpu_capacity = model.get_resource_from_id(
|
||||
ResourceType.cpu_cores).get_capacity(vm)
|
||||
vm_cpu_utilization = instance_cpu_mean_16[0].value
|
||||
total_cores_used = cpu_capacity * (vm_cpu_utilization / 100)
|
||||
else:
|
||||
raise NoDataFound("No values returned for " + str(vm.uuid) +
|
||||
" instance_cpu_percent_gauge")
|
||||
vm_cpu_utilization = self.ceilometer. \
|
||||
statistic_aggregation(resource_id=vm.uuid,
|
||||
meter_name='cpu_util',
|
||||
period="7200",
|
||||
aggregate='avg'
|
||||
)
|
||||
if vm_cpu_utilization is None:
|
||||
LOG.error(
|
||||
"No values returned for {0} cpu_util".format(vm.uuid))
|
||||
vm_cpu_utilization = 100
|
||||
|
||||
cpu_capacity = model.get_resource_from_id(
|
||||
ResourceType.cpu_cores).get_capacity(vm)
|
||||
|
||||
total_cores_used = cpu_capacity * (vm_cpu_utilization / 100)
|
||||
|
||||
return self.calculate_weight(model, vm, total_cores_used,
|
||||
0,
|
||||
0)
|
||||
|
||||
return self.calculate_weight(model, vm, total_cores_used,
|
||||
0,
|
||||
@@ -327,11 +329,12 @@ class BasicConsolidation(Strategy):
|
||||
if model is None:
|
||||
raise ClusteStateNotDefined()
|
||||
for node_id in model.get_all_hypervisors():
|
||||
builder = node_id + " utilization " + str(
|
||||
(self.calculate_score_node(
|
||||
model.get_hypervisor_from_id(node_id),
|
||||
model)) * 100) + " %"
|
||||
LOG.debug(builder)
|
||||
LOG.debug("{0} utilization {1} % ".
|
||||
format(node_id,
|
||||
self.calculate_score_node(
|
||||
model.get_hypervisor_from_id(
|
||||
node_id),
|
||||
model)))
|
||||
|
||||
def execute(self, orign_model):
|
||||
LOG.debug("initialize Sercon Consolidation")
|
||||
@@ -352,18 +355,18 @@ class BasicConsolidation(Strategy):
|
||||
|
||||
self.compute_attempts(size_cluster)
|
||||
|
||||
for hypevisor_id in current_model.get_all_hypervisors():
|
||||
hypervisor = current_model.get_hypervisor_from_id(hypevisor_id)
|
||||
for hypervisor_id in current_model.get_all_hypervisors():
|
||||
hypervisor = current_model.get_hypervisor_from_id(hypervisor_id)
|
||||
count = current_model.get_mapping(). \
|
||||
get_node_vms_from_id(hypevisor_id)
|
||||
get_node_vms_from_id(hypervisor_id)
|
||||
if len(count) == 0:
|
||||
change_power = ChangePowerState(hypervisor)
|
||||
change_power.powerstate = PowerState.g1_S1
|
||||
change_power.set_level(StrategyLevel.conservative)
|
||||
change_power.level = StrategyLevel.conservative
|
||||
self.solution.add_change_request(change_power)
|
||||
if hypervisor.state == HypervisorState.ONLINE:
|
||||
h = ChangeHypervisorState(hypervisor)
|
||||
h.set_level(StrategyLevel.aggressive)
|
||||
h.level = StrategyLevel.aggressive
|
||||
h.state = HypervisorState.OFFLINE
|
||||
self.solution.add_change_request(h)
|
||||
|
||||
@@ -376,10 +379,11 @@ class BasicConsolidation(Strategy):
|
||||
score = []
|
||||
|
||||
''' calculate score of nodes based on load by VMs '''
|
||||
for hypevisor_id in current_model.get_all_hypervisors():
|
||||
hypervisor = current_model.get_hypervisor_from_id(hypevisor_id)
|
||||
for hypervisor_id in current_model.get_all_hypervisors():
|
||||
hypervisor = current_model.get_hypervisor_from_id(
|
||||
hypervisor_id)
|
||||
count = current_model.get_mapping(). \
|
||||
get_node_vms_from_id(hypevisor_id)
|
||||
get_node_vms_from_id(hypervisor_id)
|
||||
if len(count) > 0:
|
||||
result = self.calculate_score_node(hypervisor,
|
||||
current_model)
|
||||
@@ -387,11 +391,11 @@ class BasicConsolidation(Strategy):
|
||||
''' the hypervisor has not VMs '''
|
||||
result = 0
|
||||
if len(count) > 0:
|
||||
score.append((hypevisor_id, result))
|
||||
score.append((hypervisor_id, result))
|
||||
|
||||
''' sort compute nodes by Score decreasing '''''
|
||||
s = sorted(score, reverse=True, key=lambda x: (x[1]))
|
||||
LOG.debug("Hypervisor(s) BFD {0}".format(str(s)))
|
||||
LOG.debug("Hypervisor(s) BFD {0}".format(s))
|
||||
|
||||
''' get Node to be released '''
|
||||
if len(score) == 0:
|
||||
@@ -415,7 +419,7 @@ class BasicConsolidation(Strategy):
|
||||
|
||||
''' sort VM's by Score '''
|
||||
v = sorted(vm_score, reverse=True, key=lambda x: (x[1]))
|
||||
LOG.debug("VM(s) BFD {0}".format(str(v)))
|
||||
LOG.debug("VM(s) BFD {0}".format(v))
|
||||
|
||||
m = 0
|
||||
tmp_vm_migration_schedule = []
|
||||
@@ -442,8 +446,7 @@ class BasicConsolidation(Strategy):
|
||||
# live migration
|
||||
live_migrate.set_migration_type(
|
||||
MigrationType.pre_copy)
|
||||
live_migrate.set_level(
|
||||
StrategyLevel.conservative)
|
||||
live_migrate.level = StrategyLevel.conservative
|
||||
|
||||
tmp_vm_migration_schedule.append(live_migrate)
|
||||
|
||||
@@ -453,12 +456,11 @@ class BasicConsolidation(Strategy):
|
||||
# from conservative to aggressive
|
||||
change_power = ChangePowerState(mig_src_hypervisor)
|
||||
change_power.powerstate = PowerState.g1_S1
|
||||
change_power.set_level(
|
||||
StrategyLevel.conservative)
|
||||
change_power.level = StrategyLevel.conservative
|
||||
tmp_vm_migration_schedule.append(change_power)
|
||||
|
||||
h = ChangeHypervisorState(mig_src_hypervisor)
|
||||
h.set_level(StrategyLevel.aggressive)
|
||||
h.level = StrategyLevel.aggressive
|
||||
|
||||
h.state = HypervisorState.OFFLINE
|
||||
tmp_vm_migration_schedule.append(h)
|
||||
@@ -481,6 +483,6 @@ class BasicConsolidation(Strategy):
|
||||
"efficiency": self.efficiency
|
||||
}
|
||||
LOG.debug(infos)
|
||||
self.solution.set_model(current_model)
|
||||
self.solution.set_efficiency(self.efficiency)
|
||||
self.solution.model = current_model
|
||||
self.solution.efficiency = self.efficiency
|
||||
return self.solution
|
||||
|
||||
@@ -19,10 +19,13 @@
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.decision_engine.api.strategy.strategy import Strategy
|
||||
from watcher.decision_engine.framework.meta_actions.nop import Nop
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DummyStrategy(Strategy):
|
||||
def execute(self, model):
|
||||
return self.get_solution()
|
||||
n = Nop()
|
||||
self.solution.add_change_request(n)
|
||||
return self.solution
|
||||
|
||||
Reference in New Issue
Block a user