Fix audit creation with named goal and strategy

This patch set fixes process of audit creation and
allows to create audit without Audit Template using only
names of Goal and Strategy. It also provides some additional
unit tests to improve tests covering.

Change-Id: I89a9c7661616f49639151869055d8f5ebe723d5f
Closes-Bug: #1794233
This commit is contained in:
Alexander Chadin
2018-09-25 13:44:51 +03:00
parent 925b971377
commit 7cb81ac6c5
3 changed files with 127 additions and 92 deletions

View File

@@ -37,13 +37,16 @@ def post_get_test_audit(**kw):
audit_template = db_utils.get_test_audit_template()
goal = db_utils.get_test_goal()
del_keys = ['goal_id', 'strategy_id']
del_keys.extend(kw.get('params_to_exclude', []))
add_keys = {'audit_template_uuid': audit_template['uuid'],
'goal': goal['uuid'],
}
for k in del_keys:
del audit[k]
if kw.get('use_named_goal'):
add_keys['goal'] = 'TEST'
for k in add_keys:
audit[k] = kw.get(k, add_keys[k])
for k in del_keys:
del audit[k]
return audit
@@ -491,13 +494,10 @@ class TestPost(api_base.FunctionalTest):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
@@ -530,18 +530,78 @@ class TestPost(api_base.FunctionalTest):
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_with_at_uuid_and_goal_specified(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname'])
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_goal(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_goal_without_strategy(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid', 'strategy'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_named_goal(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid'],
use_named_goal=True)
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_invalid_audit_template_uuid(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
# Make the audit template UUID some garbage value
audit_dict['audit_template_uuid'] = (
'01234567-8910-1112-1314-151617181920')
@@ -558,14 +618,12 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_doesnt_contain_id(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
state = audit_dict['state']
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
with mock.patch.object(self.dbapi, 'create_audit',
wraps=self.dbapi.create_audit) as cn_mock:
response = self.post_json('/audits', audit_dict)
@@ -578,13 +636,9 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_generate_uuid(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
@@ -597,12 +651,9 @@ class TestPost(api_base.FunctionalTest):
def test_create_continuous_audit_with_interval(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '1200'
@@ -619,12 +670,9 @@ class TestPost(api_base.FunctionalTest):
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '* * * * *'
@@ -641,12 +689,9 @@ class TestPost(api_base.FunctionalTest):
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = 'zxc'
@@ -662,14 +707,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_continuous_audit_without_period(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
@@ -683,13 +724,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_oneshot_audit_with_period(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.ONESHOT.value
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
@@ -701,13 +739,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_trigger_decision_engine(self):
with mock.patch.object(deapi.DecisionEngineAPI,
'trigger_audit') as de_mock:
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
de_mock.assert_called_once_with(mock.ANY, response.json['uuid'])
@@ -726,13 +761,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_parameters_no_predefined_strategy(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(parameters={'name': 'Tom'})
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
parameters={'name': 'Tom'},
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
@@ -831,16 +863,13 @@ class TestPost(api_base.FunctionalTest):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit()
audit_dict = post_get_test_audit(
params_to_exclude=['state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
normal_name = 'this audit name is just for test'
# long_name length exceeds 63 characters
long_name = normal_name + audit_dict['uuid']
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict['name'] = normal_name
response = self.post_json('/audits', audit_dict)
@@ -962,12 +991,10 @@ class TestAuditPolicyEnforcement(api_base.FunctionalTest):
'op': 'replace'}], expect_errors=True)
def test_policy_disallow_create(self):
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
self._common_policy_check(
"audit:create", self.post_json, '/audits', audit_dict,
expect_errors=True)