Add checking audit state
This patch adds checking audit state when updating an existing audit in accordance with audit state machine. Closes-Bug: #1662406 Change-Id: I20610c83169b77f141974a5cebe33818a4bf0728
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
import itertools
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
@@ -267,7 +268,7 @@ class TestPatch(api_base.FunctionalTest):
|
||||
test_time = datetime.datetime(2000, 1, 1, 0, 0)
|
||||
mock_utcnow.return_value = test_time
|
||||
|
||||
new_state = objects.audit.State.SUCCEEDED
|
||||
new_state = objects.audit.State.CANCELLED
|
||||
response = self.get_json('/audits/%s' % self.audit.uuid)
|
||||
self.assertNotEqual(new_state, response['state'])
|
||||
|
||||
@@ -343,6 +344,128 @@ class TestPatch(api_base.FunctionalTest):
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
|
||||
ALLOWED_TRANSITIONS = [
|
||||
{"original_state": objects.audit.State.PENDING,
|
||||
"new_state": objects.audit.State.ONGOING},
|
||||
{"original_state": objects.audit.State.PENDING,
|
||||
"new_state": objects.audit.State.CANCELLED},
|
||||
{"original_state": objects.audit.State.ONGOING,
|
||||
"new_state": objects.audit.State.FAILED},
|
||||
{"original_state": objects.audit.State.ONGOING,
|
||||
"new_state": objects.audit.State.SUCCEEDED},
|
||||
{"original_state": objects.audit.State.ONGOING,
|
||||
"new_state": objects.audit.State.CANCELLED},
|
||||
{"original_state": objects.audit.State.FAILED,
|
||||
"new_state": objects.audit.State.DELETED},
|
||||
{"original_state": objects.audit.State.SUCCEEDED,
|
||||
"new_state": objects.audit.State.DELETED},
|
||||
{"original_state": objects.audit.State.CANCELLED,
|
||||
"new_state": objects.audit.State.DELETED},
|
||||
]
|
||||
|
||||
|
||||
class TestPatchStateTransitionDenied(api_base.FunctionalTest):
|
||||
|
||||
STATES = [
|
||||
ap_state for ap_state in objects.audit.State.__dict__
|
||||
if not ap_state.startswith("_")
|
||||
]
|
||||
|
||||
scenarios = [
|
||||
(
|
||||
"%s -> %s" % (original_state, new_state),
|
||||
{"original_state": original_state,
|
||||
"new_state": new_state},
|
||||
)
|
||||
for original_state, new_state
|
||||
in list(itertools.product(STATES, STATES))
|
||||
if original_state != new_state
|
||||
and {"original_state": original_state,
|
||||
"new_state": new_state} not in ALLOWED_TRANSITIONS
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestPatchStateTransitionDenied, self).setUp()
|
||||
obj_utils.create_test_goal(self.context)
|
||||
obj_utils.create_test_strategy(self.context)
|
||||
obj_utils.create_test_audit_template(self.context)
|
||||
self.audit = obj_utils.create_test_audit(self.context,
|
||||
state=self.original_state)
|
||||
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
|
||||
self.mock_audit_update = p.start()
|
||||
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
def _simulate_rpc_audit_update(self, audit):
|
||||
audit.save()
|
||||
return audit
|
||||
|
||||
def test_replace_denied(self):
|
||||
response = self.get_json('/audits/%s' % self.audit.uuid)
|
||||
self.assertNotEqual(self.new_state, response['state'])
|
||||
|
||||
response = self.patch_json(
|
||||
'/audits/%s' % self.audit.uuid,
|
||||
[{'path': '/state', 'value': self.new_state,
|
||||
'op': 'replace'}],
|
||||
expect_errors=True)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertEqual(400, response.status_code)
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
response = self.get_json('/audits/%s' % self.audit.uuid)
|
||||
self.assertEqual(self.original_state, response['state'])
|
||||
|
||||
|
||||
class TestPatchStateTransitionOk(api_base.FunctionalTest):
|
||||
|
||||
scenarios = [
|
||||
(
|
||||
"%s -> %s" % (transition["original_state"],
|
||||
transition["new_state"]),
|
||||
transition
|
||||
)
|
||||
for transition in ALLOWED_TRANSITIONS
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestPatchStateTransitionOk, self).setUp()
|
||||
obj_utils.create_test_goal(self.context)
|
||||
obj_utils.create_test_strategy(self.context)
|
||||
obj_utils.create_test_audit_template(self.context)
|
||||
self.audit = obj_utils.create_test_audit(self.context,
|
||||
state=self.original_state)
|
||||
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
|
||||
self.mock_audit_update = p.start()
|
||||
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
def _simulate_rpc_audit_update(self, audit):
|
||||
audit.save()
|
||||
return audit
|
||||
|
||||
@mock.patch('oslo_utils.timeutils.utcnow')
|
||||
def test_replace_ok(self, mock_utcnow):
|
||||
test_time = datetime.datetime(2000, 1, 1, 0, 0)
|
||||
mock_utcnow.return_value = test_time
|
||||
|
||||
response = self.get_json('/audits/%s' % self.audit.uuid)
|
||||
self.assertNotEqual(self.new_state, response['state'])
|
||||
|
||||
response = self.patch_json(
|
||||
'/audits/%s' % self.audit.uuid,
|
||||
[{'path': '/state', 'value': self.new_state,
|
||||
'op': 'replace'}])
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertEqual(200, response.status_code)
|
||||
|
||||
response = self.get_json('/audits/%s' % self.audit.uuid)
|
||||
self.assertEqual(self.new_state, response['state'])
|
||||
return_updated_at = timeutils.parse_isotime(
|
||||
response['updated_at']).replace(tzinfo=None)
|
||||
self.assertEqual(test_time, return_updated_at)
|
||||
|
||||
|
||||
class TestPost(api_base.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
Reference in New Issue
Block a user