Merge "Add parameters to force failures in nop action"

This commit is contained in:
Zuul
2025-08-21 14:32:37 +00:00
committed by Gerrit Code Review
3 changed files with 94 additions and 3 deletions

View File

@@ -0,0 +1,14 @@
---
features:
- |
Three new parameters have been added to the ``nop`` action:
* ``fail_pre_condition``: When setting it to `true` the action
fails on the pre_condition execution.
* ``fail_execute``: When setting it to `true` the action fails
on the execute step.
* ``fail_post_condition``: When setting it to `true` the action
fails on the post_condition execution.

View File

@@ -20,6 +20,7 @@
from oslo_log import log
from watcher.applier.actions import base
from watcher.common import exception
LOG = log.getLogger(__name__)
@@ -45,6 +46,18 @@ class Nop(base.BaseAction):
'properties': {
'message': {
'type': ['string', 'null']
},
'fail_pre_condition': {
'type': 'boolean',
'default': False
},
'fail_execute': {
'type': 'boolean',
'default': False
},
'fail_post_condition': {
'type': 'boolean',
'default': False
}
},
'required': ['message'],
@@ -55,8 +68,13 @@ class Nop(base.BaseAction):
def message(self):
return self.input_parameters.get(self.MESSAGE)
def debug_message(self, message):
return ("Executing action NOP message: %s ", message)
def execute(self):
LOG.debug("Executing action NOP message: %s ", self.message)
LOG.debug(self.debug_message(self.message))
if self.input_parameters.get('fail_execute'):
return False
return True
def revert(self):
@@ -64,10 +82,12 @@ class Nop(base.BaseAction):
return True
def pre_condition(self):
pass
if self.input_parameters.get('fail_pre_condition'):
raise exception.WatcherException("Failed in pre_condition")
def post_condition(self):
pass
if self.input_parameters.get('fail_post_condition'):
raise exception.WatcherException("Failed in post_condition")
def get_description(self):
"""Description of the action"""

View File

@@ -91,6 +91,63 @@ class TestTaskFlowActionContainer(base.DbTestCase):
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
def test_execute_with_failed_execute(self):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.PENDING,
action_type='nop',
input_parameters={'message': 'hello World',
'fail_execute': True})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.execute()
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
def test_pre_execute_with_failed_pre_condition(self):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.PENDING,
action_type='nop',
input_parameters={'message': 'hello World',
'fail_pre_condition': True})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.pre_execute()
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
def test_post_execute_with_failed_post_condition(self):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.ONGOING,
action_type='nop',
input_parameters={'message': 'hello World',
'fail_post_condition': True})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.post_execute()
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
@mock.patch('eventlet.spawn')
def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn):
action_plan = obj_utils.create_test_action_plan(