diff --git a/setup.cfg b/setup.cfg index 917dd0e4c..e49233296 100644 --- a/setup.cfg +++ b/setup.cfg @@ -104,6 +104,7 @@ watcher_workflow_engines = watcher_planners = weight = watcher.decision_engine.planner.weight:WeightPlanner workload_stabilization = watcher.decision_engine.planner.workload_stabilization:WorkloadStabilizationPlanner + node_resource_consolidation = watcher.decision_engine.planner.node_resource_consolidation:NodeResourceConsolidationPlanner watcher_cluster_data_model_collectors = compute = watcher.decision_engine.model.collector.nova:NovaClusterDataModelCollector diff --git a/watcher/decision_engine/planner/node_resource_consolidation.py b/watcher/decision_engine/planner/node_resource_consolidation.py new file mode 100644 index 000000000..9f0602975 --- /dev/null +++ b/watcher/decision_engine/planner/node_resource_consolidation.py @@ -0,0 +1,163 @@ +# -*- encoding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from oslo_log import log + +from watcher.common import exception +from watcher.common import utils +from watcher.decision_engine.model import element +from watcher.decision_engine.planner import base +from watcher import objects + +LOG = log.getLogger(__name__) + + +class NodeResourceConsolidationPlanner(base.BasePlanner): + """Node Resource Consolidation planner implementation + + This implementation preserves the original order of actions in the + solution and try to parallelize actions which have the same action type. + + *Limitations* + + - This is a proof of concept that is not meant to be used in production + """ + + def create_action(self, + action_plan_id, + action_type, + input_parameters=None): + uuid = utils.generate_uuid() + action = { + 'uuid': uuid, + 'action_plan_id': int(action_plan_id), + 'action_type': action_type, + 'input_parameters': input_parameters, + 'state': objects.action.State.PENDING, + 'parents': None + } + + return action + + def schedule(self, context, audit_id, solution): + LOG.debug('Creating an action plan for the audit uuid: %s', audit_id) + action_plan = self._create_action_plan(context, audit_id, solution) + + actions = list(solution.actions) + if len(actions) == 0: + LOG.warning("The action plan is empty") + action_plan.state = objects.action_plan.State.SUCCEEDED + action_plan.save() + return action_plan + + node_disabled_actions = [] + node_enabled_actions = [] + node_migrate_actions = {} + for action in actions: + action_type = action.get('action_type') + parameters = action.get('input_parameters') + json_action = self.create_action( + action_plan_id=action_plan.id, + action_type=action_type, + input_parameters=parameters) + # classing actions + if action_type == 'change_nova_service_state': + if parameters.get('state') == ( + element.ServiceState.DISABLED.value): + node_disabled_actions.append(json_action) + else: + node_enabled_actions.append(json_action) + elif action_type == 'migrate': + source_node = parameters.get('source_node') + if source_node in node_migrate_actions: + node_migrate_actions[source_node].append(json_action) + else: + node_migrate_actions[source_node] = [json_action] + else: + raise exception.UnsupportedActionType( + action_type=action.get("action_type")) + + # creating actions + mig_parents = [] + for action in node_disabled_actions: + mig_parents.append(action['uuid']) + self._create_action(context, action) + + enabled_parents = [] + for actions in node_migrate_actions.values(): + enabled_parents.append(actions[-1].get('uuid')) + pre_action_uuid = [] + for action in actions: + action['parents'] = mig_parents + pre_action_uuid + pre_action_uuid = [action['uuid']] + self._create_action(context, action) + + for action in node_enabled_actions: + action['parents'] = enabled_parents + self._create_action(context, action) + + self._create_efficacy_indicators( + context, action_plan.id, solution.efficacy_indicators) + + return action_plan + + def _create_action_plan(self, context, audit_id, solution): + strategy = objects.Strategy.get_by_name( + context, solution.strategy.name) + + action_plan_dict = { + 'uuid': utils.generate_uuid(), + 'audit_id': audit_id, + 'strategy_id': strategy.id, + 'state': objects.action_plan.State.RECOMMENDED, + 'global_efficacy': solution.global_efficacy, + } + + new_action_plan = objects.ActionPlan(context, **action_plan_dict) + new_action_plan.create() + + return new_action_plan + + def _create_efficacy_indicators(self, context, action_plan_id, indicators): + efficacy_indicators = [] + for indicator in indicators: + efficacy_indicator_dict = { + 'uuid': utils.generate_uuid(), + 'name': indicator.name, + 'description': indicator.description, + 'unit': indicator.unit, + 'value': indicator.value, + 'action_plan_id': action_plan_id, + } + new_efficacy_indicator = objects.EfficacyIndicator( + context, **efficacy_indicator_dict) + new_efficacy_indicator.create() + + efficacy_indicators.append(new_efficacy_indicator) + return efficacy_indicators + + def _create_action(self, context, _action): + try: + LOG.debug("Creating the %s in the Watcher database", + _action.get("action_type")) + + new_action = objects.Action(context, **_action) + new_action.create() + + return new_action + except Exception as exc: + LOG.exception(exc) + raise diff --git a/watcher/tests/decision_engine/planner/test_node_resource_consolidation.py b/watcher/tests/decision_engine/planner/test_node_resource_consolidation.py new file mode 100644 index 000000000..637bacea2 --- /dev/null +++ b/watcher/tests/decision_engine/planner/test_node_resource_consolidation.py @@ -0,0 +1,245 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2019 ZTE Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from watcher.common import exception +from watcher.common import utils +from watcher.db import api as db_api +from watcher.decision_engine.planner import \ + node_resource_consolidation as pbase +from watcher.decision_engine.solution import default as dsol +from watcher import objects +from watcher.tests.db import base +from watcher.tests.db import utils as db_utils +from watcher.tests.objects import utils as obj_utils + + +class TestActionScheduling(base.DbTestCase): + + def setUp(self): + super(TestActionScheduling, self).setUp() + self.goal = db_utils.create_test_goal(name="server_consolidation") + self.strategy = db_utils.create_test_strategy( + name="node_resource_consolidation") + self.audit = db_utils.create_test_audit( + uuid=utils.generate_uuid(), strategy_id=self.strategy.id) + self.planner = pbase.NodeResourceConsolidationPlanner(mock.Mock()) + + def test_schedule_actions(self): + solution = dsol.DefaultSolution( + goal=mock.Mock(), strategy=self.strategy) + + parameters = { + "source_node": "host1", + "destination_node": "host2", + } + solution.add_action(action_type="migrate", + resource_id="b199db0c-1408-4d52-b5a5-5ca14de0ff36", + input_parameters=parameters) + + with mock.patch.object( + pbase.NodeResourceConsolidationPlanner, "create_action", + wraps=self.planner.create_action + ) as m_create_action: + action_plan = self.planner.schedule( + self.context, self.audit.id, solution) + + self.assertIsNotNone(action_plan.uuid) + self.assertEqual(1, m_create_action.call_count) + filters = {'action_plan_id': action_plan.id} + actions = objects.Action.dbapi.get_action_list(self.context, filters) + self.assertEqual("migrate", actions[0].action_type) + + def test_schedule_two_actions(self): + solution = dsol.DefaultSolution( + goal=mock.Mock(), strategy=self.strategy) + + server1_uuid = "b199db0c-1408-4d52-b5a5-5ca14de0ff36" + server2_uuid = "b199db0c-1408-4d52-b5a5-5ca14de0ff37" + solution.add_action(action_type="migrate", + resource_id=server1_uuid, + input_parameters={ + "source_node": "host1", + "destination_node": "host2", + }) + + solution.add_action(action_type="migrate", + resource_id=server2_uuid, + input_parameters={ + "source_node": "host1", + "destination_node": "host3", + }) + + with mock.patch.object( + pbase.NodeResourceConsolidationPlanner, "create_action", + wraps=self.planner.create_action + ) as m_create_action: + action_plan = self.planner.schedule( + self.context, self.audit.id, solution) + self.assertIsNotNone(action_plan.uuid) + self.assertEqual(2, m_create_action.call_count) + # check order + filters = {'action_plan_id': action_plan.id} + actions = objects.Action.dbapi.get_action_list(self.context, filters) + self.assertEqual( + server1_uuid, actions[0]['input_parameters'].get('resource_id')) + self.assertEqual( + server2_uuid, actions[1]['input_parameters'].get('resource_id')) + self.assertIn(actions[0]['uuid'], actions[1]['parents']) + + def test_schedule_actions_with_unknown_action(self): + solution = dsol.DefaultSolution( + goal=mock.Mock(), strategy=self.strategy) + + parameters = { + "src_uuid_node": "host1", + "dst_uuid_node": "host2", + } + solution.add_action(action_type="migrate", + resource_id="b199db0c-1408-4d52-b5a5-5ca14de0ff36", + input_parameters=parameters) + + solution.add_action(action_type="new_action_type", + resource_id="", + input_parameters={}) + + with mock.patch.object( + pbase.NodeResourceConsolidationPlanner, "create_action", + wraps=self.planner.create_action + ) as m_create_action: + self.assertRaises( + exception.UnsupportedActionType, + self.planner.schedule, + self.context, self.audit.id, solution) + self.assertEqual(2, m_create_action.call_count) + + def test_schedule_migrate_change_state_actions(self): + solution = dsol.DefaultSolution( + goal=mock.Mock(), strategy=self.strategy) + + solution.add_action(action_type="change_nova_service_state", + resource_id="b199db0c-1408-4d52-b5a5-5ca14de0ff36", + input_parameters={"state": "disabled"}) + + solution.add_action(action_type="change_nova_service_state", + resource_id="b199db0c-1408-4d52-b5a5-5ca14de0ff37", + input_parameters={"state": "disabled"}) + + solution.add_action(action_type="migrate", + resource_id="f6416850-da28-4047-a547-8c49f53e95fe", + input_parameters={"source_node": "host1"}) + + solution.add_action(action_type="migrate", + resource_id="bb404e74-2caf-447b-bd1e-9234db386ca5", + input_parameters={"source_node": "host2"}) + + solution.add_action(action_type="migrate", + resource_id="f6416850-da28-4047-a547-8c49f53e95ff", + input_parameters={"source_node": "host1"}) + + solution.add_action(action_type="change_nova_service_state", + resource_id="b199db0c-1408-4d52-b5a5-5ca14de0ff36", + input_parameters={"state": "enabled"}) + + solution.add_action(action_type="change_nova_service_state", + resource_id="b199db0c-1408-4d52-b5a5-5ca14de0ff37", + input_parameters={"state": "enabled"}) + + with mock.patch.object( + pbase.NodeResourceConsolidationPlanner, "create_action", + wraps=self.planner.create_action + ) as m_create_action: + action_plan = self.planner.schedule( + self.context, self.audit.id, solution) + self.assertIsNotNone(action_plan.uuid) + self.assertEqual(7, m_create_action.call_count) + # check order + filters = {'action_plan_id': action_plan.id} + actions = objects.Action.dbapi.get_action_list(self.context, filters) + self.assertEqual("change_nova_service_state", actions[0].action_type) + self.assertEqual("change_nova_service_state", actions[1].action_type) + self.assertEqual("migrate", actions[2].action_type) + self.assertEqual("migrate", actions[3].action_type) + self.assertEqual("migrate", actions[4].action_type) + self.assertEqual("change_nova_service_state", actions[5].action_type) + self.assertEqual("change_nova_service_state", actions[6].action_type) + action0_uuid = actions[0]['uuid'] + action1_uuid = actions[1]['uuid'] + action2_uuid = actions[2]['uuid'] + action3_uuid = actions[3]['uuid'] + action4_uuid = actions[4]['uuid'] + action5_uuid = actions[5]['uuid'] + action6_uuid = actions[6]['uuid'] + # parents of action3,4,5 are action0,1 + # resource2 and 4 have the same source, + # so action about resource4 depends on + # action about resource2 + parents = [] + for action in actions: + if action.parents: + parents.extend(action.parents) + self.assertIn(action0_uuid, parents) + self.assertIn(action1_uuid, parents) + self.assertIn(action2_uuid, parents) + self.assertIn(action3_uuid, parents) + self.assertIn(action4_uuid, parents) + self.assertNotIn(action5_uuid, parents) + self.assertNotIn(action6_uuid, parents) + + +class TestDefaultPlanner(base.DbTestCase): + + def setUp(self): + super(TestDefaultPlanner, self).setUp() + self.planner = pbase.NodeResourceConsolidationPlanner(mock.Mock()) + + self.goal = obj_utils.create_test_goal(self.context) + self.strategy = obj_utils.create_test_strategy( + self.context, goal_id=self.goal.id) + obj_utils.create_test_audit_template( + self.context, goal_id=self.goal.id, strategy_id=self.strategy.id) + + p = mock.patch.object(db_api.BaseConnection, 'create_action_plan') + self.mock_create_action_plan = p.start() + self.mock_create_action_plan.side_effect = ( + self._simulate_action_plan_create) + self.addCleanup(p.stop) + + q = mock.patch.object(db_api.BaseConnection, 'create_action') + self.mock_create_action = q.start() + self.mock_create_action.side_effect = ( + self._simulate_action_create) + self.addCleanup(q.stop) + + def _simulate_action_plan_create(self, action_plan): + action_plan.create() + return action_plan + + def _simulate_action_create(self, action): + action.create() + return action + + @mock.patch.object(objects.Strategy, 'get_by_name') + def test_scheduler_warning_empty_action_plan(self, m_get_by_name): + m_get_by_name.return_value = self.strategy + audit = db_utils.create_test_audit( + goal_id=self.goal.id, strategy_id=self.strategy.id) + fake_solution = mock.MagicMock(efficacy_indicators=[], + actions=[]) + action_plan = self.planner.schedule( + self.context, audit.id, fake_solution) + self.assertIsNotNone(action_plan.uuid)