Merge "Use taskflow library for building and executing action plans"
This commit is contained in:
@@ -20,7 +20,7 @@ from mock import call
|
||||
from mock import MagicMock
|
||||
|
||||
from watcher.applier.action_plan.default import DefaultActionPlanHandler
|
||||
from watcher.applier.messaging.events import Events
|
||||
from watcher.applier.messaging.event_types import EventTypes
|
||||
from watcher.objects.action_plan import Status
|
||||
from watcher.objects import ActionPlan
|
||||
from watcher.tests.db.base import DbTestCase
|
||||
@@ -33,17 +33,7 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
||||
self.action_plan = obj_utils.create_test_action_plan(
|
||||
self.context)
|
||||
|
||||
def test_launch_action_plan_wihout_errors(self):
|
||||
try:
|
||||
|
||||
command = DefaultActionPlanHandler(self.context, MagicMock(),
|
||||
self.action_plan.uuid)
|
||||
command.execute()
|
||||
except Exception as e:
|
||||
self.fail(
|
||||
"The ActionPlan should be trigged wihtour error" + unicode(e))
|
||||
|
||||
def test_launch_action_plan_state_failed(self):
|
||||
def test_launch_action_plan(self):
|
||||
command = DefaultActionPlanHandler(self.context, MagicMock(),
|
||||
self.action_plan.uuid)
|
||||
command.execute()
|
||||
@@ -57,10 +47,10 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
||||
self.action_plan.uuid)
|
||||
command.execute()
|
||||
|
||||
call_on_going = call(Events.LAUNCH_ACTION_PLAN.name, {
|
||||
call_on_going = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||
'action_plan_status': Status.ONGOING,
|
||||
'action_plan__uuid': self.action_plan.uuid})
|
||||
call_succeeded = call(Events.LAUNCH_ACTION_PLAN.name, {
|
||||
call_succeeded = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||
'action_plan_status': Status.SUCCEEDED,
|
||||
'action_plan__uuid': self.action_plan.uuid})
|
||||
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2016 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from watcher.applier.actions import base as abase
|
||||
from watcher.applier.actions.loading import default
|
||||
from watcher.tests import base
|
||||
|
||||
|
||||
class TestDefaultActionLoader(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestDefaultActionLoader, self).setUp()
|
||||
self.loader = default.DefaultActionLoader()
|
||||
|
||||
def test_endpoints(self):
|
||||
for endpoint in self.loader.list_available():
|
||||
loaded = self.loader.load(endpoint)
|
||||
self.assertIsNotNone(loaded)
|
||||
self.assertIsInstance(loaded, abase.BaseAction)
|
||||
@@ -1,56 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import mock
|
||||
|
||||
from watcher.applier.execution import default
|
||||
from watcher.common import utils
|
||||
from watcher import objects
|
||||
from watcher.tests.db import base
|
||||
|
||||
|
||||
class TestDefaultActionPlanExecutor(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestDefaultActionPlanExecutor, self).setUp()
|
||||
self.executor = default.DefaultActionPlanExecutor(mock.MagicMock(),
|
||||
self.context)
|
||||
|
||||
def test_execute(self):
|
||||
actions = mock.MagicMock()
|
||||
result = self.executor.execute(actions)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
def test_execute_with_actions(self):
|
||||
actions = []
|
||||
action = {
|
||||
'uuid': utils.generate_uuid(),
|
||||
'action_plan_id': 0,
|
||||
'action_type': "nop",
|
||||
'applies_to': '',
|
||||
'input_parameters': {'state': 'OFFLINE'},
|
||||
'state': objects.action.Status.PENDING,
|
||||
'alarm': None,
|
||||
'next': None,
|
||||
}
|
||||
new_action = objects.Action(self.context, **action)
|
||||
new_action.create(self.context)
|
||||
new_action.save()
|
||||
actions.append(objects.Action.get_by_uuid(self.context,
|
||||
action['uuid']))
|
||||
result = self.executor.execute(actions)
|
||||
self.assertEqual(result, True)
|
||||
@@ -18,8 +18,9 @@
|
||||
#
|
||||
|
||||
|
||||
from mock import MagicMock
|
||||
from watcher.applier.messaging.trigger import TriggerActionPlan
|
||||
import mock
|
||||
|
||||
from watcher.applier.messaging import trigger
|
||||
from watcher.common import utils
|
||||
from watcher.tests import base
|
||||
|
||||
@@ -27,8 +28,8 @@ from watcher.tests import base
|
||||
class TestTriggerActionPlan(base.TestCase):
|
||||
def __init__(self, *args, **kwds):
|
||||
super(TestTriggerActionPlan, self).__init__(*args, **kwds)
|
||||
self.applier = MagicMock()
|
||||
self.endpoint = TriggerActionPlan(self.applier)
|
||||
self.applier = mock.MagicMock()
|
||||
self.endpoint = trigger.TriggerActionPlan(self.applier)
|
||||
|
||||
def setUp(self):
|
||||
super(TestTriggerActionPlan, self).setUp()
|
||||
0
watcher/tests/applier/workflow_engine/__init__.py
Normal file
0
watcher/tests/applier/workflow_engine/__init__.py
Normal file
@@ -0,0 +1,32 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2016 b<>com
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from watcher.applier.workflow_engine import base as wbase
|
||||
from watcher.applier.workflow_engine.loading import default
|
||||
from watcher.tests import base
|
||||
|
||||
|
||||
class TestDefaultActionLoader(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestDefaultActionLoader, self).setUp()
|
||||
self.loader = default.DefaultWorkFlowEngineLoader()
|
||||
|
||||
def test_endpoints(self):
|
||||
for endpoint in self.loader.list_available():
|
||||
loaded = self.loader.load(endpoint)
|
||||
self.assertIsNotNone(loaded)
|
||||
self.assertIsInstance(loaded, wbase.BaseWorkFlowEngine)
|
||||
@@ -0,0 +1,164 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2015 b<>com
|
||||
#
|
||||
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import abc
|
||||
import mock
|
||||
|
||||
import six
|
||||
from stevedore import driver
|
||||
from stevedore import extension
|
||||
|
||||
from watcher.applier.actions import base as abase
|
||||
from watcher.applier.workflow_engine import default as tflow
|
||||
from watcher.common import utils
|
||||
from watcher import objects
|
||||
from watcher.tests.db import base
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class FakeAction(abase.BaseAction):
|
||||
def precondition(self):
|
||||
pass
|
||||
|
||||
def revert(self):
|
||||
pass
|
||||
|
||||
def execute(self):
|
||||
raise Exception()
|
||||
|
||||
@classmethod
|
||||
def namespace(cls):
|
||||
return "TESTING"
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return 'fake_action'
|
||||
|
||||
|
||||
class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestDefaultWorkFlowEngine, self).setUp()
|
||||
self.engine = tflow.DefaultWorkFlowEngine()
|
||||
self.engine.context = self.context
|
||||
self.engine.applier_manager = mock.MagicMock()
|
||||
|
||||
def test_execute(self):
|
||||
actions = mock.MagicMock()
|
||||
result = self.engine.execute(actions)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
def create_action(self, action_type, applies_to, parameters, next):
|
||||
action = {
|
||||
'uuid': utils.generate_uuid(),
|
||||
'action_plan_id': 0,
|
||||
'action_type': action_type,
|
||||
'applies_to': applies_to,
|
||||
'input_parameters': parameters,
|
||||
'state': objects.action.Status.PENDING,
|
||||
'alarm': None,
|
||||
'next': next,
|
||||
}
|
||||
new_action = objects.Action(self.context, **action)
|
||||
new_action.create(self.context)
|
||||
new_action.save()
|
||||
return new_action
|
||||
|
||||
def check_action_state(self, action, expected_state):
|
||||
to_check = objects.Action.get_by_uuid(self.context, action.uuid)
|
||||
self.assertEqual(to_check.state, expected_state)
|
||||
|
||||
def check_actions_state(self, actions, expected_state):
|
||||
for a in actions:
|
||||
self.check_action_state(a, expected_state)
|
||||
|
||||
def test_execute_with_no_actions(self):
|
||||
actions = []
|
||||
result = self.engine.execute(actions)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
def test_execute_with_one_action(self):
|
||||
actions = [self.create_action("nop", "", {'message': 'test'}, None)]
|
||||
result = self.engine.execute(actions)
|
||||
self.assertEqual(result, True)
|
||||
self.check_actions_state(actions, objects.action.Status.SUCCEEDED)
|
||||
|
||||
def test_execute_with_two_actions(self):
|
||||
actions = []
|
||||
next = self.create_action("sleep", "", {'duration': '0'}, None)
|
||||
first = self.create_action("nop", "", {'message': 'test'}, next.id)
|
||||
|
||||
actions.append(first)
|
||||
actions.append(next)
|
||||
|
||||
result = self.engine.execute(actions)
|
||||
self.assertEqual(result, True)
|
||||
self.check_actions_state(actions, objects.action.Status.SUCCEEDED)
|
||||
|
||||
def test_execute_with_three_actions(self):
|
||||
actions = []
|
||||
next2 = self.create_action("nop", "vm1", {'message': 'next'}, None)
|
||||
next = self.create_action("sleep", "vm1", {'duration': '0'}, next2.id)
|
||||
first = self.create_action("nop", "vm1", {'message': 'hello'}, next.id)
|
||||
self.check_action_state(first, objects.action.Status.PENDING)
|
||||
self.check_action_state(next, objects.action.Status.PENDING)
|
||||
self.check_action_state(next2, objects.action.Status.PENDING)
|
||||
|
||||
actions.append(first)
|
||||
actions.append(next)
|
||||
actions.append(next2)
|
||||
|
||||
result = self.engine.execute(actions)
|
||||
self.assertEqual(result, True)
|
||||
self.check_actions_state(actions, objects.action.Status.SUCCEEDED)
|
||||
|
||||
def test_execute_with_exception(self):
|
||||
actions = []
|
||||
next2 = self.create_action("no_exist",
|
||||
"vm1", {'message': 'next'}, None)
|
||||
next = self.create_action("sleep", "vm1",
|
||||
{'duration': '0'}, next2.id)
|
||||
first = self.create_action("nop", "vm1",
|
||||
{'message': 'hello'}, next.id)
|
||||
|
||||
self.check_action_state(first, objects.action.Status.PENDING)
|
||||
self.check_action_state(next, objects.action.Status.PENDING)
|
||||
self.check_action_state(next2, objects.action.Status.PENDING)
|
||||
actions.append(first)
|
||||
actions.append(next)
|
||||
actions.append(next2)
|
||||
|
||||
result = self.engine.execute(actions)
|
||||
self.assertEqual(result, False)
|
||||
self.check_action_state(first, objects.action.Status.SUCCEEDED)
|
||||
self.check_action_state(next, objects.action.Status.SUCCEEDED)
|
||||
self.check_action_state(next2, objects.action.Status.FAILED)
|
||||
|
||||
@mock.patch("watcher.common.loader.default.DriverManager")
|
||||
def test_execute_with_action_exception(self, m_driver):
|
||||
m_driver.return_value = driver.DriverManager.make_test_instance(
|
||||
extension=extension.Extension(name=FakeAction.get_name(),
|
||||
entry_point="%s:%s" % (
|
||||
FakeAction.__module__,
|
||||
FakeAction.__name__),
|
||||
plugin=FakeAction,
|
||||
obj=None),
|
||||
namespace=FakeAction.namespace())
|
||||
actions = [self.create_action("dontcare", "vm1", {}, None)]
|
||||
result = self.engine.execute(actions)
|
||||
self.assertEqual(result, False)
|
||||
self.check_action_state(actions[0], objects.action.Status.FAILED)
|
||||
@@ -18,25 +18,26 @@ import mock
|
||||
|
||||
from watcher.common import utils
|
||||
from watcher.db import api as db_api
|
||||
from watcher.decision_engine.planner.default import DefaultPlanner
|
||||
from watcher.decision_engine.solution.default import DefaultSolution
|
||||
from watcher.decision_engine.strategy.strategies.basic_consolidation import \
|
||||
BasicConsolidation
|
||||
from watcher.decision_engine.planner import default as pbase
|
||||
from watcher.decision_engine.solution import default as dsol
|
||||
from watcher.decision_engine.strategy import strategies
|
||||
from watcher import objects
|
||||
from watcher.tests.db import base
|
||||
from watcher.tests.db import utils as db_utils
|
||||
from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \
|
||||
import FakerModelCollector
|
||||
from watcher.tests.decision_engine.strategy.strategies.faker_metrics_collector \
|
||||
import FakerMetricsCollector
|
||||
from watcher.tests.decision_engine.strategy.strategies \
|
||||
import faker_cluster_state
|
||||
from watcher.tests.decision_engine.strategy.strategies \
|
||||
import faker_metrics_collector as fake
|
||||
from watcher.tests.objects import utils as obj_utils
|
||||
|
||||
|
||||
class SolutionFaker(object):
|
||||
@staticmethod
|
||||
def build():
|
||||
metrics = FakerMetricsCollector()
|
||||
current_state_cluster = FakerModelCollector()
|
||||
sercon = BasicConsolidation("basic", "Basic offline consolidation")
|
||||
metrics = fake.FakerMetricsCollector()
|
||||
current_state_cluster = faker_cluster_state.FakerModelCollector()
|
||||
sercon = strategies.BasicConsolidation("basic",
|
||||
"Basic offline consolidation")
|
||||
sercon.ceilometer = mock.\
|
||||
MagicMock(get_statistics=metrics.mock_get_statistics)
|
||||
return sercon.execute(current_state_cluster.generate_scenario_1())
|
||||
@@ -45,9 +46,10 @@ class SolutionFaker(object):
|
||||
class SolutionFakerSingleHyp(object):
|
||||
@staticmethod
|
||||
def build():
|
||||
metrics = FakerMetricsCollector()
|
||||
current_state_cluster = FakerModelCollector()
|
||||
sercon = BasicConsolidation("basic", "Basic offline consolidation")
|
||||
metrics = fake.FakerMetricsCollector()
|
||||
current_state_cluster = faker_cluster_state.FakerModelCollector()
|
||||
sercon = strategies.BasicConsolidation("basic",
|
||||
"Basic offline consolidation")
|
||||
sercon.ceilometer = \
|
||||
mock.MagicMock(get_statistics=metrics.mock_get_statistics)
|
||||
|
||||
@@ -57,9 +59,9 @@ class SolutionFakerSingleHyp(object):
|
||||
|
||||
class TestActionScheduling(base.DbTestCase):
|
||||
def test_schedule_actions(self):
|
||||
default_planner = DefaultPlanner()
|
||||
default_planner = pbase.DefaultPlanner()
|
||||
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
|
||||
solution = DefaultSolution()
|
||||
solution = dsol.DefaultSolution()
|
||||
|
||||
parameters = {
|
||||
"src_uuid_hypervisor": "server1",
|
||||
@@ -70,7 +72,7 @@ class TestActionScheduling(base.DbTestCase):
|
||||
input_parameters=parameters)
|
||||
|
||||
with mock.patch.object(
|
||||
DefaultPlanner, "create_action",
|
||||
pbase.DefaultPlanner, "create_action",
|
||||
wraps=default_planner.create_action) as m_create_action:
|
||||
action_plan = default_planner.schedule(
|
||||
self.context, audit.id, solution
|
||||
@@ -78,12 +80,46 @@ class TestActionScheduling(base.DbTestCase):
|
||||
|
||||
self.assertIsNotNone(action_plan.uuid)
|
||||
self.assertEqual(m_create_action.call_count, 1)
|
||||
filters = {'action_plan_id': action_plan.id}
|
||||
actions = objects.Action.dbapi.get_action_list(self.context, filters)
|
||||
self.assertEqual(actions[0].action_type, "migrate")
|
||||
|
||||
def test_schedule_two_actions(self):
|
||||
default_planner = pbase.DefaultPlanner()
|
||||
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
|
||||
solution = dsol.DefaultSolution()
|
||||
|
||||
parameters = {
|
||||
"src_uuid_hypervisor": "server1",
|
||||
"dst_uuid_hypervisor": "server2",
|
||||
}
|
||||
solution.add_action(action_type="migrate",
|
||||
applies_to="b199db0c-1408-4d52-b5a5-5ca14de0ff36",
|
||||
input_parameters=parameters)
|
||||
|
||||
solution.add_action(action_type="nop",
|
||||
applies_to="",
|
||||
input_parameters={})
|
||||
|
||||
with mock.patch.object(
|
||||
pbase.DefaultPlanner, "create_action",
|
||||
wraps=default_planner.create_action) as m_create_action:
|
||||
action_plan = default_planner.schedule(
|
||||
self.context, audit.id, solution
|
||||
)
|
||||
self.assertIsNotNone(action_plan.uuid)
|
||||
self.assertEqual(m_create_action.call_count, 2)
|
||||
# check order
|
||||
filters = {'action_plan_id': action_plan.id}
|
||||
actions = objects.Action.dbapi.get_action_list(self.context, filters)
|
||||
self.assertEqual(actions[0].action_type, "nop")
|
||||
self.assertEqual(actions[1].action_type, "migrate")
|
||||
|
||||
|
||||
class TestDefaultPlanner(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestDefaultPlanner, self).setUp()
|
||||
self.default_planner = DefaultPlanner()
|
||||
self.default_planner = pbase.DefaultPlanner()
|
||||
obj_utils.create_test_audit_template(self.context)
|
||||
|
||||
p = mock.patch.object(db_api.BaseConnection, 'create_action_plan')
|
||||
|
||||
Reference in New Issue
Block a user