Add a generic and extensible way to describe the flow of actions

In watcher, an audit generates a set of actions which
aims at achieving a given goal (lower energy consumption, ...).
It is possible to configure different strategies in order to achieve
each goal. Each strategy is written as a Python class which produces
a set of actions. Today, the set of possible actions is fixed for a
given version of Watcher and enables optimization algorithms to
include actions such as instance migration, changing hypervisor state,
changing power state (ACPI level, ...).

This patchset propose a generic and extensible way to describe
the actions and his parameters that we want to add to Action Plan.
It also remove the static actions because they are now deprecated.

The documentation regarding strategy plugin need to be
updated (plugins.rst).

DocImpact
Partially implements: blueprint watcher-add-actions-via-conf

Change-Id: I3d641080e8ad89786abca79a942c8deb2d53355b
This commit is contained in:
Jean-Emile DARTOIS
2016-01-08 12:03:55 +01:00
parent 47759202a8
commit 86d3c2ff89
20 changed files with 191 additions and 524 deletions

View File

@@ -1,45 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc
import six
from watcher.decision_engine.strategy.common.level import StrategyLevel
@six.add_metaclass(abc.ABCMeta)
class BaseAction(object):
def __init__(self):
self._level = StrategyLevel.conservative
self._priority = 0
@property
def level(self):
return self._level
@level.setter
def level(self, l):
self._level = l
@property
def priority(self):
return self._priority
@priority.setter
def priority(self, p):
self._priority = p

View File

@@ -1,52 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from watcher.decision_engine.actions.base import BaseAction
from watcher.decision_engine.model.hypervisor_state import HypervisorState
class ChangeHypervisorState(BaseAction):
def __init__(self, target):
'''The target host to change the state
:param target: the target hypervisor uuid
'''
super(ChangeHypervisorState, self).__init__()
self._target = target
self._state = HypervisorState.ONLINE
@property
def state(self):
return self._state
@state.setter
def state(self, state):
self._state = state
@property
def target(self):
return self._target
@target.setter
def target(self, p):
self._target = p
def __str__(self):
return "{} ChangeHypervisorState => {}".format(self.target,
self.state)

View File

@@ -1,72 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from enum import Enum
from watcher.decision_engine.actions.base import BaseAction
class MigrationType(Enum):
# Total migration time and downtime depend on memory dirtying speed
pre_copy = 0
# Postcopy transfer a page only once reliability
post_copy = 1
class Migrate(BaseAction):
def __init__(self, vm, src_hypervisor, dest_hypervisor):
"""Request to migrate a virtual machine from a host to another
:param vm: the virtual machine uuid to migrate
:param src_hypervisor: uuid
:param dest_hypervisor: uuid
"""
super(Migrate, self).__init__()
self._reserved_disk_iops = 0
self._remaining_dirty_pages = 0
self._vm = vm
self._migration_type = MigrationType.pre_copy
self._src_hypervisor = src_hypervisor
self._dest_hypervisor = dest_hypervisor
@property
def migration_type(self):
return self._migration_type
@migration_type.setter
def migration_type(self, type):
self._migration_type = type
@property
def vm(self):
return self._vm
@property
def src_hypervisor(self):
return self._src_hypervisor
@property
def dest_hypervisor(self):
return self._dest_hypervisor
def __str__(self):
return "Migrate {} from {} to {}".format(
self.vm,
self.src_hypervisor,
self.dest_hypervisor)

View File

@@ -1,26 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from watcher.decision_engine.actions.base import BaseAction
class Nop(BaseAction):
def __str__(self):
return "Nop"

View File

@@ -1,52 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from watcher.decision_engine.actions.base import BaseAction
from watcher.decision_engine.model.power_state import PowerState
class ChangePowerState(BaseAction):
def __init__(self, target):
"""The target host to change the power
:param target:
"""
super(ChangePowerState, self).__init__()
self._target = target
self._power_state = PowerState.g0
@property
def powerstate(self):
return self._power_state
@powerstate.setter
def powerstate(self, p):
self._power_state = p
@property
def target(self):
return self._target
@target.setter
def target(self, t):
self._target = t
def __str__(self):
return "ChangePowerState {} => {} ".format(self.target,
self.powerstate)

View File

@@ -23,5 +23,5 @@ import six
@six.add_metaclass(abc.ABCMeta)
class BaseAuditHandler(object):
@abc.abstractmethod
def execute(self):
def execute(self, audit_uuid, request_context):
raise NotImplementedError()

View File

@@ -17,26 +17,19 @@
# limitations under the License.
#
import json
import enum
from oslo_log import log
from watcher._i18n import _LW
from watcher.common import exception
from watcher.common import utils
from watcher.decision_engine.actions import hypervisor_state
from watcher.decision_engine.actions import migration
from watcher.decision_engine.actions import nop
from watcher.decision_engine.actions import power_state
from watcher.decision_engine.planner import base
from watcher import objects
LOG = log.getLogger(__name__)
# TODO(jed) The default planner is a very simple planner
# https://wiki.openstack.org/wiki/NovaOrchestration/WorkflowEngines
class Primitives(enum.Enum):
LIVE_MIGRATE = 'MIGRATE'
COLD_MIGRATE = 'MIGRATE'
@@ -45,32 +38,25 @@ class Primitives(enum.Enum):
NOP = 'NOP'
priority_primitives = {
Primitives.NOP.value: 0,
Primitives.HYPERVISOR_STATE.value: 1,
Primitives.LIVE_MIGRATE.value: 2,
Primitives.COLD_MIGRATE.value: 3,
Primitives.POWER_STATE.value: 4,
}
class DefaultPlanner(base.BasePlanner):
def create_action(self, action_plan_id, action_type, applies_to=None,
src=None,
dst=None,
parameter=None,
description=None):
uuid = utils.generate_uuid()
priorities = {
'nop': 0,
'migrate': 1,
'change_nova_service_state': 2,
}
def create_action(self,
action_plan_id,
action_type,
applies_to,
input_parameters=None):
uuid = utils.generate_uuid()
action = {
'uuid': uuid,
'action_plan_id': int(action_plan_id),
'action_type': action_type,
'applies_to': applies_to,
'src': src,
'dst': dst,
'parameter': parameter,
'description': description,
'input_parameters': json.dumps(input_parameters),
'state': objects.action.Status.PENDING,
'alarm': None,
'next': None,
@@ -83,53 +69,19 @@ class DefaultPlanner(base.BasePlanner):
actions = list(solution.actions)
to_schedule = []
for action in actions:
if isinstance(action, migration.Migrate):
# TODO(jed) type
primitive = self.create_action(action_plan.id,
Primitives.LIVE_MIGRATE.value,
action.vm.uuid,
action.src_hypervisor.
uuid,
action.dest_hypervisor.
uuid,
description="{0}".format(
action)
)
elif isinstance(action, power_state.ChangePowerState):
primitive = self.create_action(action_plan_id=action_plan.id,
action_type=Primitives.
POWER_STATE.value,
applies_to=action.target.uuid,
parameter=action.
powerstate.
value,
description="{0}".format(
action))
elif isinstance(action, hypervisor_state.ChangeHypervisorState):
primitive = self.create_action(action_plan_id=action_plan.id,
action_type=Primitives.
HYPERVISOR_STATE.value,
applies_to=action.target.uuid,
parameter=action.state.
value,
description="{0}".format(
action))
elif isinstance(action, nop.Nop):
primitive = self.create_action(action_plan_id=action_plan.id,
action_type=Primitives.
NOP.value,
description="{0}".format(
action))
else:
raise exception.ActionNotFound()
priority = priority_primitives[primitive['action_type']]
to_schedule.append((priority, primitive))
json_action = self.create_action(action_plan_id=action_plan.id,
action_type=action.get(
'action_type'),
applies_to=action.get(
'applies_to'),
input_parameters=action.get(
'input_parameters'))
to_schedule.append((self.priorities[action.get('action_type')],
json_action))
# scheduling
scheduled = sorted(to_schedule, reverse=False, key=lambda x: (x[0]))
scheduled = sorted(to_schedule, key=lambda x: (x[0]))
if len(scheduled) == 0:
LOG.warning(_LW("The action plan is empty"))
action_plan.first_action_id = None
@@ -147,6 +99,7 @@ class DefaultPlanner(base.BasePlanner):
action = self._create_action(context, s_action[1],
parent_action)
parent_action = action
return action_plan
def _create_action_plan(self, context, audit_id):

View File

@@ -52,7 +52,21 @@ class BaseSolution(object):
self._origin = m
@abc.abstractmethod
def add_change_request(self, r):
def add_action(self,
action_type,
applies_to,
input_parameters=None):
"""Add a new Action in the Action Plan
:param action_type: the unique id of an action type defined in
entry point 'watcher_actions'
:param applies_to: the unique id of the resource to which the
`Action` applies.
:param input_parameters: An array of input parameters provided as
key-value pairs of strings.
Each key-pair contains names and values that match what was previously
defined in the `Action` type schema.
"""
raise NotImplementedError()
@abc.abstractproperty

View File

@@ -17,6 +17,7 @@
# limitations under the License.
#
from oslo_log import log
from watcher.decision_engine.solution.base import BaseSolution
LOG = log.getLogger(__name__)
@@ -32,8 +33,16 @@ class DefaultSolution(BaseSolution):
super(DefaultSolution, self).__init__()
self._actions = []
def add_change_request(self, r):
self._actions.append(r)
def add_action(self, action_type,
applies_to,
input_parameters=None):
# todo(jed) add https://pypi.python.org/pypi/schema
action = {
'action_type': action_type,
'applies_to': applies_to,
'input_parameters': input_parameters
}
self._actions.append(action)
def __str__(self):
return "\n".join(self._actions)

View File

@@ -20,18 +20,10 @@
from oslo_log import log
from watcher._i18n import _LE, _LI, _LW
from watcher.common.exception import ClusterEmpty
from watcher.common.exception import ClusterStateNotDefined
from watcher.decision_engine.actions.hypervisor_state import \
ChangeHypervisorState
from watcher.decision_engine.actions.migration import Migrate
from watcher.decision_engine.actions.migration import MigrationType
from watcher.decision_engine.actions.power_state import ChangePowerState
from watcher.common import exception
from watcher.decision_engine.model.hypervisor_state import HypervisorState
from watcher.decision_engine.model.power_state import PowerState
from watcher.decision_engine.model.resource import ResourceType
from watcher.decision_engine.model.vm_state import VMState
from watcher.decision_engine.strategy.common.level import StrategyLevel
from watcher.decision_engine.strategy.strategies.base import BaseStrategy
from watcher.metrics_engine.cluster_history.ceilometer import \
CeilometerClusterHistory
@@ -43,8 +35,11 @@ class BasicConsolidation(BaseStrategy):
DEFAULT_NAME = "basic"
DEFAULT_DESCRIPTION = "Basic offline consolidation"
host_cpu_usage_metric_name = 'compute.node.cpu.percent'
instance_cpu_usage_metric_name = 'cpu_util'
HOST_CPU_USAGE_METRIC_NAME = 'compute.node.cpu.percent'
INSTANCE_CPU_USAGE_METRIC_NAME = 'cpu_util'
MIGRATION = "migrate"
CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state"
def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION):
"""Basic offline Consolidation using live migration
@@ -125,14 +120,14 @@ class BasicConsolidation(BaseStrategy):
src_hypervisor,
dest_hypervisor,
vm_to_mig):
'''check if the migration is possible
"""check if the migration is possible
:param model: the current state of the cluster
:param src_hypervisor: the current node of the virtual machine
:param dest_hypervisor: the destination of the virtual machine
:param vm_to_mig: the virtual machine
:return: True if the there is enough place otherwise false
'''
"""
if src_hypervisor == dest_hypervisor:
return False
@@ -263,7 +258,7 @@ class BasicConsolidation(BaseStrategy):
resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname)
cpu_avg_vm = self.ceilometer. \
statistic_aggregation(resource_id=resource_id,
meter_name=self.host_cpu_usage_metric_name,
meter_name=self.HOST_CPU_USAGE_METRIC_NAME,
period="7200",
aggregate='avg'
)
@@ -272,7 +267,7 @@ class BasicConsolidation(BaseStrategy):
_LE("No values returned by %(resource_id)s "
"for %(metric_name)s"),
resource_id=resource_id,
metric_name=self.host_cpu_usage_metric_name,
metric_name=self.HOST_CPU_USAGE_METRIC_NAME,
)
cpu_avg_vm = 100
@@ -305,12 +300,12 @@ class BasicConsolidation(BaseStrategy):
:return: score
"""
if model is None:
raise ClusterStateNotDefined()
raise exception.ClusterStateNotDefined()
vm_cpu_utilization = self.ceilometer. \
statistic_aggregation(
resource_id=vm.uuid,
meter_name=self.instance_cpu_usage_metric_name,
meter_name=self.INSTANCE_CPU_USAGE_METRIC_NAME,
period="7200",
aggregate='avg'
)
@@ -319,7 +314,7 @@ class BasicConsolidation(BaseStrategy):
_LE("No values returned by %(resource_id)s "
"for %(metric_name)s"),
resource_id=vm.uuid,
metric_name=self.instance_cpu_usage_metric_name,
metric_name=self.INSTANCE_CPU_USAGE_METRIC_NAME,
)
vm_cpu_utilization = 100
@@ -332,22 +327,29 @@ class BasicConsolidation(BaseStrategy):
0,
0)
def print_utilization(self, model):
if model is None:
raise ClusterStateNotDefined()
for node_id in model.get_all_hypervisors():
LOG.debug("{0} utilization {1} % ".
format(node_id,
self.calculate_score_node(
model.get_hypervisor_from_id(
node_id),
model)))
def add_change_service_state(self, applies_to, state):
parameters = {'state': state}
self.solution.add_action(action_type=self.CHANGE_NOVA_SERVICE_STATE,
applies_to=applies_to,
input_parameters=parameters)
def add_migration(self,
applies_to,
migration_type,
src_hypervisor_uuid,
dst_hypervisor_uuid):
parameters = {'migration_type': migration_type,
'src_hypervisor_uuid': src_hypervisor_uuid,
'dst_hypervisor_uuid': dst_hypervisor_uuid}
self.solution.add_action(action_type=self.MIGRATION,
applies_to=applies_to,
input_parameters=parameters)
def execute(self, orign_model):
LOG.info(_LI("Initializing Sercon Consolidation"))
if orign_model is None:
raise ClusterStateNotDefined()
raise exception.ClusterStateNotDefined()
# todo(jed) clone model
current_model = orign_model
@@ -358,7 +360,7 @@ class BasicConsolidation(BaseStrategy):
first = True
size_cluster = len(current_model.get_all_hypervisors())
if size_cluster == 0:
raise ClusterEmpty()
raise exception.ClusterEmpty()
self.compute_attempts(size_cluster)
@@ -367,15 +369,10 @@ class BasicConsolidation(BaseStrategy):
count = current_model.get_mapping(). \
get_node_vms_from_id(hypervisor_id)
if len(count) == 0:
change_power = ChangePowerState(hypervisor)
change_power.powerstate = PowerState.g1_S1
change_power.level = StrategyLevel.conservative
self.solution.add_change_request(change_power)
if hypervisor.state == HypervisorState.ONLINE:
h = ChangeHypervisorState(hypervisor)
h.level = StrategyLevel.aggressive
h.state = HypervisorState.OFFLINE
self.solution.add_change_request(h)
self.add_change_service_state(hypervisor_id,
HypervisorState.
OFFLINE.value)
while self.get_allowed_migration_attempts() >= unsuccessful_migration:
if first is not True:
@@ -430,7 +427,6 @@ class BasicConsolidation(BaseStrategy):
LOG.debug("VM(s) BFD {0}".format(v))
m = 0
tmp_vm_migration_schedule = []
for vm in v:
for j in range(0, len(s)):
mig_vm = current_model.get_vm_from_id(vm[0])
@@ -448,31 +444,16 @@ class BasicConsolidation(BaseStrategy):
if current_model.get_mapping(). \
migrate_vm(mig_vm, mig_src_hypervisor,
mig_dst_hypervisor):
live_migrate = Migrate(mig_vm,
mig_src_hypervisor,
mig_dst_hypervisor)
# live migration
live_migrate.migration_type = \
MigrationType.pre_copy
live_migrate.level = StrategyLevel.conservative
tmp_vm_migration_schedule.append(live_migrate)
self.add_migration(mig_vm.uuid, 'live',
mig_src_hypervisor.uuid,
mig_dst_hypervisor.uuid)
if len(current_model.get_mapping().get_node_vms(
mig_src_hypervisor)) == 0:
# TODO(jed) how to manage strategy level
# from conservative to aggressive
change_power = ChangePowerState(mig_src_hypervisor)
change_power.powerstate = PowerState.g1_S1
change_power.level = StrategyLevel.conservative
tmp_vm_migration_schedule.append(change_power)
h = ChangeHypervisorState(mig_src_hypervisor)
h.level = StrategyLevel.aggressive
h.state = HypervisorState.OFFLINE
tmp_vm_migration_schedule.append(h)
self.add_change_service_state(mig_src_hypervisor.
uuid,
HypervisorState.
OFFLINE.value)
self.number_of_released_nodes += 1
m += 1
@@ -480,11 +461,8 @@ class BasicConsolidation(BaseStrategy):
if m > 0:
self.number_of_migrations = self.number_of_migrations + m
unsuccessful_migration = 0
for a in tmp_vm_migration_schedule:
self.solution.add_change_request(a)
else:
unsuccessful_migration += 1
# self.print_utilization(current_model)
infos = {
"number_of_migrations": self.number_of_migrations,
"number_of_nodes_released": self.number_of_released_nodes,

View File

@@ -17,22 +17,27 @@
# limitations under the License.
#
from oslo_log import log
from watcher.decision_engine.strategy.strategies.base import BaseStrategy
from watcher.decision_engine.actions.nop import Nop
from watcher.decision_engine.strategy.strategies.base import BaseStrategy
LOG = log.getLogger(__name__)
class DummyStrategy(BaseStrategy):
DEFAULT_NAME = "dummy"
DEFAULT_DESCRIPTION = "Dummy Strategy"
NOP = "nop"
def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION):
super(DummyStrategy, self).__init__(name, description)
def execute(self, model):
n = Nop()
self.solution.add_change_request(n)
parameters = {'message': 'hello World'}
self.solution.add_action(action_type=self.NOP,
applies_to="",
input_parameters=parameters)
# todo(jed) add a new action to test the flow
# with two differents actions
return self.solution

View File

@@ -20,11 +20,8 @@ from oslo_log import log
from watcher._i18n import _LE
from watcher.common import exception as wexc
from watcher.decision_engine.actions.migration import Migrate
from watcher.decision_engine.actions.migration import MigrationType
from watcher.decision_engine.model.resource import ResourceType
from watcher.decision_engine.model.vm_state import VMState
from watcher.decision_engine.strategy.common.level import StrategyLevel
from watcher.decision_engine.strategy.strategies.base import BaseStrategy
from watcher.metrics_engine.cluster_history.ceilometer import \
CeilometerClusterHistory
@@ -41,6 +38,8 @@ class OutletTempControl(BaseStrategy):
# Unit: degree C
THRESHOLD = 35.0
MIGRATION = "migrate"
def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION):
"""[PoC]Outlet temperature control using live migration
@@ -230,21 +229,19 @@ class OutletTempControl(BaseStrategy):
LOG.info(_LE("No proper target host could be found"))
return self.solution
dest_servers = sorted(dest_servers,
reverse=False,
key=lambda x: (x["outlet_temp"]))
dest_servers = sorted(dest_servers, key=lambda x: (x["outlet_temp"]))
# always use the host with lowerest outlet temperature
mig_dst_hypervisor = dest_servers[0]['hv']
# generate solution to migrate the vm to the dest server,
if current_model.get_mapping().migrate_vm(vm_src,
mig_src_hypervisor,
mig_dst_hypervisor):
live_migrate = Migrate(vm_src,
mig_src_hypervisor,
mig_dst_hypervisor)
live_migrate.migration_type = MigrationType.pre_copy
live_migrate.level = StrategyLevel.conservative
self.solution.add_change_request(live_migrate)
parameters = {'migration_type': 'live',
'src_hypervisor_uuid': mig_src_hypervisor,
'dst_hypervisor_uuid': mig_dst_hypervisor}
self.solution.add_action(action_type=self.MIGRATION,
applies_to=vm_src,
input_parameters=parameters)
self.solution.model = current_model

View File

@@ -83,7 +83,7 @@ def get_test_action(**kwargs):
'id': kwargs.get('id', 1),
'uuid': kwargs.get('uuid', '10a47dd1-4874-4298-91cf-eff046dbdb8d'),
'action_plan_id': kwargs.get('action_plan_id', 1),
'action_type': kwargs.get('action_type', 'COLD_MIGRATION'),
'action_type': kwargs.get('action_type', 'nop'),
'applies_to': kwargs.get('applies_to',
'10a47dd1-4874-4298-91cf-eff046dbdb8d'),
'input_parameters': kwargs.get('input_parameters', {'key1': 'val1',

View File

@@ -16,19 +16,17 @@
import mock
from watcher.common import exception
from watcher.common import utils
from watcher.db import api as db_api
from watcher.decision_engine.actions.base import BaseAction
from watcher.decision_engine.planner.default import DefaultPlanner
from watcher.decision_engine.solution.default import DefaultSolution
from watcher.decision_engine.strategy.strategies.basic_consolidation import \
BasicConsolidation
from watcher.tests.db import base
from watcher.tests.db import utils as db_utils
from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state\
from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \
import FakerModelCollector
from watcher.tests.decision_engine.strategy.strategies.faker_metrics_collector\
from watcher.tests.decision_engine.strategy.strategies.faker_metrics_collector \
import FakerMetricsCollector
from watcher.tests.objects import utils as obj_utils
@@ -39,8 +37,8 @@ class SolutionFaker(object):
metrics = FakerMetricsCollector()
current_state_cluster = FakerModelCollector()
sercon = BasicConsolidation("basic", "Basic offline consolidation")
sercon.ceilometer = mock.MagicMock(
get_statistics=metrics.mock_get_statistics)
sercon.ceilometer = mock.\
MagicMock(get_statistics=metrics.mock_get_statistics)
return sercon.execute(current_state_cluster.generate_scenario_1())
@@ -50,29 +48,32 @@ class SolutionFakerSingleHyp(object):
metrics = FakerMetricsCollector()
current_state_cluster = FakerModelCollector()
sercon = BasicConsolidation("basic", "Basic offline consolidation")
sercon.ceilometer = mock.MagicMock(
get_statistics=metrics.mock_get_statistics)
sercon.ceilometer = \
mock.MagicMock(get_statistics=metrics.mock_get_statistics)
return sercon.execute(
current_state_cluster.generate_scenario_3_with_2_hypervisors())
class TestActionScheduling(base.DbTestCase):
scenarios = [
(str(action_cls), {"fake_action": mock.Mock(spec=action_cls)})
for action_cls in BaseAction.__subclasses__()]
def test_schedule_actions(self):
default_planner = DefaultPlanner()
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
dummy_solution = DefaultSolution()
dummy_solution.add_change_request(self.fake_action)
solution = DefaultSolution()
parameters = {
"src_uuid_hypervisor": "server1",
"dst_uuid_hypervisor": "server2",
}
solution.add_action(action_type="migrate",
applies_to="b199db0c-1408-4d52-b5a5-5ca14de0ff36",
input_parameters=parameters)
with mock.patch.object(
DefaultPlanner, "create_action",
wraps=default_planner.create_action) as m_create_action:
action_plan = default_planner.schedule(
self.context, audit.id, dummy_solution
self.context, audit.id, solution
)
self.assertIsNotNone(action_plan.uuid)
@@ -112,14 +113,6 @@ class TestDefaultPlanner(base.DbTestCase):
audit.id, fake_solution)
self.assertIsNotNone(action_plan.uuid)
def test_schedule_raise(self):
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
fake_solution = SolutionFaker.build()
fake_solution.actions[0] = "valeur_qcq"
self.assertRaises(exception.ActionNotFound,
self.default_planner.schedule,
self.context, audit.id, fake_solution)
def test_schedule_scheduled_empty(self):
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
fake_solution = SolutionFakerSingleHyp.build()

View File

@@ -0,0 +1,39 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.solution.default import DefaultSolution
from watcher.tests import base
class TestDefaultSolution(base.BaseTestCase):
def test_default_solution(self):
solution = DefaultSolution()
parameters = {
"src_uuid_hypervisor": "server1",
"dst_uuid_hypervisor": "server2",
}
solution.add_action(action_type="nop",
applies_to="b199db0c-1408-4d52-b5a5-5ca14de0ff36",
input_parameters=parameters)
self.assertEqual(len(solution.actions), 1)
expected_action_type = "nop"
expected_applies_to = "b199db0c-1408-4d52-b5a5-5ca14de0ff36"
expected_parameters = parameters
self.assertEqual(solution.actions[0].get('action_type'),
expected_action_type)
self.assertEqual(solution.actions[0].get('applies_to'),
expected_applies_to)
self.assertEqual(solution.actions[0].get('input_parameters'),
expected_parameters)

View File

@@ -17,15 +17,9 @@
# limitations under the License.
#
from collections import Counter
import mock
from mock import MagicMock
from watcher.common import exception
from watcher.decision_engine.actions.hypervisor_state import \
ChangeHypervisorState
from watcher.decision_engine.actions.migration import Migrate
from watcher.decision_engine.actions.power_state import ChangePowerState
from watcher.decision_engine.model.model_root import ModelRoot
from watcher.decision_engine.strategy.strategies.basic_consolidation import \
BasicConsolidation
@@ -52,7 +46,7 @@ class TestBasicConsolidation(base.BaseTestCase):
def test_basic_consolidation_score_hypervisor(self):
cluster = self.fake_cluster.generate_scenario_1()
sercon = BasicConsolidation()
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
node_1_score = 0.023333333333333317
@@ -74,7 +68,7 @@ class TestBasicConsolidation(base.BaseTestCase):
def test_basic_consolidation_score_vm(self):
cluster = self.fake_cluster.generate_scenario_1()
sercon = BasicConsolidation()
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
vm_0_score = 0.023333333333333317
@@ -97,7 +91,7 @@ class TestBasicConsolidation(base.BaseTestCase):
def test_basic_consolidation_score_vm_disk(self):
cluster = self.fake_cluster.generate_scenario_5_with_vm_disk_0()
sercon = BasicConsolidation()
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
vm_0_score = 0.023333333333333355
@@ -106,7 +100,7 @@ class TestBasicConsolidation(base.BaseTestCase):
def test_basic_consolidation_weight(self):
cluster = self.fake_cluster.generate_scenario_1()
sercon = BasicConsolidation()
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
cores = 16
@@ -138,17 +132,12 @@ class TestBasicConsolidation(base.BaseTestCase):
metrics = FakerMetricsCollector()
metrics.empty_one_metric("CPU_COMPUTE")
sercon = BasicConsolidation()
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.assertRaises(exception.ClusterStateNotDefined,
sercon.calculate_score_vm, "VM_1", None)
def test_print_utilization_raise_cluster_state_not_found(self):
sercon = BasicConsolidation()
self.assertRaises(exception.ClusterStateNotDefined,
sercon.print_utilization, None)
def test_check_migration(self):
sercon = BasicConsolidation()
fake_cluster = FakerModelCollector()
@@ -182,34 +171,28 @@ class TestBasicConsolidation(base.BaseTestCase):
def test_basic_consolidation_migration(self):
sercon = BasicConsolidation()
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
solution = sercon.execute(
self.fake_cluster.generate_scenario_2())
self.fake_cluster.generate_scenario_3_with_2_hypervisors())
actions_counter = Counter(
[type(action) for action in solution.actions])
[action.get('action_type') for action in solution.actions])
expected_num_migrations = 0
expected_num_migrations = 1
expected_power_state = 0
expected_change_hypervisor_state = 0
num_migrations = actions_counter.get(Migrate, 0)
num_migrations = actions_counter.get("migrate", 0)
num_hypervisor_state_change = actions_counter.get(
ChangeHypervisorState, 0)
num_power_state_change = actions_counter.get(
ChangePowerState, 0)
"change_hypervisor_state", 0)
self.assertEqual(num_migrations, expected_num_migrations)
self.assertEqual(num_hypervisor_state_change, expected_power_state)
self.assertEqual(num_power_state_change,
expected_change_hypervisor_state)
def test_execute_cluster_empty(self):
current_state_cluster = FakerModelCollector()
sercon = BasicConsolidation("sercon", "Basic offline consolidation")
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = current_state_cluster.generate_random(0, 0)
self.assertRaises(exception.ClusterEmpty, sercon.execute, model)
@@ -217,7 +200,7 @@ class TestBasicConsolidation(base.BaseTestCase):
# calculate_weight
def test_execute_no_workload(self):
sercon = BasicConsolidation()
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
current_state_cluster = FakerModelCollector()

View File

@@ -17,12 +17,9 @@
# limitations under the License.
#
from collections import Counter
from mock import MagicMock
import mock
from watcher.common import exception
from watcher.decision_engine.actions.migration import Migrate
from watcher.decision_engine.model.model_root import ModelRoot
from watcher.decision_engine.model.resource import ResourceType
from watcher.decision_engine.strategy.strategies.outlet_temp_control import \
@@ -59,7 +56,7 @@ class TestOutletTempControl(base.BaseTestCase):
def test_group_hosts_by_outlet_temp(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
self.assertEqual(h1[0]['hv'].uuid, 'Node_1')
@@ -68,7 +65,7 @@ class TestOutletTempControl(base.BaseTestCase):
def test_choose_vm_to_migrate(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
@@ -78,7 +75,7 @@ class TestOutletTempControl(base.BaseTestCase):
def test_filter_dest_servers(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
@@ -99,14 +96,14 @@ class TestOutletTempControl(base.BaseTestCase):
def test_execute_cluster_empty(self):
current_state_cluster = FakerModelCollector()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = current_state_cluster.generate_random(0, 0)
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
def test_execute_no_workload(self):
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
current_state_cluster = FakerModelCollector()
@@ -118,12 +115,12 @@ class TestOutletTempControl(base.BaseTestCase):
def test_execute(self):
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = strategy.execute(model)
actions_counter = Counter(
[type(action) for action in solution.actions])
[action.get('action_type') for action in solution.actions])
num_migrations = actions_counter.get(Migrate, 0)
num_migrations = actions_counter.get("migrate", 0)
self.assertEqual(num_migrations, 1)

View File

@@ -1,30 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.actions.base import BaseAction
from watcher.tests import base
class TestAction(base.TestCase):
def test_get_priority(self):
ma = BaseAction()
ma.priority = 3
self.assertEqual(ma.priority, 3)
def test_get_level(self):
ma = BaseAction()
ma.level = 5
self.assertEqual(ma.level, 5)

View File

@@ -1,24 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.solution.default import DefaultSolution
from watcher.tests import base
class TestDefaultSolution(base.BaseTestCase):
def test_default_solution(self):
solution = DefaultSolution()
solution.add_change_request("BLA")
self.assertEqual(solution.actions[0], "BLA")