Merge "Fix audit creation with named goal and strategy"

This commit is contained in:
Zuul
2018-10-05 09:34:05 +00:00
committed by Gerrit Code Review
3 changed files with 127 additions and 92 deletions

View File

@@ -37,13 +37,16 @@ def post_get_test_audit(**kw):
audit_template = db_utils.get_test_audit_template()
goal = db_utils.get_test_goal()
del_keys = ['goal_id', 'strategy_id']
del_keys.extend(kw.get('params_to_exclude', []))
add_keys = {'audit_template_uuid': audit_template['uuid'],
'goal': goal['uuid'],
}
for k in del_keys:
del audit[k]
if kw.get('use_named_goal'):
add_keys['goal'] = 'TEST'
for k in add_keys:
audit[k] = kw.get(k, add_keys[k])
for k in del_keys:
del audit[k]
return audit
@@ -491,13 +494,10 @@ class TestPost(api_base.FunctionalTest):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
@@ -530,18 +530,78 @@ class TestPost(api_base.FunctionalTest):
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_with_at_uuid_and_goal_specified(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname'])
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_goal(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_goal_without_strategy(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid', 'strategy'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_named_goal(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid'],
use_named_goal=True)
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_invalid_audit_template_uuid(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
# Make the audit template UUID some garbage value
audit_dict['audit_template_uuid'] = (
'01234567-8910-1112-1314-151617181920')
@@ -558,14 +618,12 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_doesnt_contain_id(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
state = audit_dict['state']
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
with mock.patch.object(self.dbapi, 'create_audit',
wraps=self.dbapi.create_audit) as cn_mock:
response = self.post_json('/audits', audit_dict)
@@ -578,13 +636,9 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_generate_uuid(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
@@ -597,12 +651,9 @@ class TestPost(api_base.FunctionalTest):
def test_create_continuous_audit_with_interval(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '1200'
@@ -619,12 +670,9 @@ class TestPost(api_base.FunctionalTest):
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '* * * * *'
@@ -641,12 +689,9 @@ class TestPost(api_base.FunctionalTest):
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = 'zxc'
@@ -662,14 +707,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_continuous_audit_without_period(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
@@ -683,13 +724,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_oneshot_audit_with_period(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.ONESHOT.value
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
@@ -701,13 +739,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_trigger_decision_engine(self):
with mock.patch.object(deapi.DecisionEngineAPI,
'trigger_audit') as de_mock:
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
de_mock.assert_called_once_with(mock.ANY, response.json['uuid'])
@@ -726,13 +761,10 @@ class TestPost(api_base.FunctionalTest):
def test_create_audit_parameters_no_predefined_strategy(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(parameters={'name': 'Tom'})
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
parameters={'name': 'Tom'},
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
@@ -831,16 +863,13 @@ class TestPost(api_base.FunctionalTest):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit()
audit_dict = post_get_test_audit(
params_to_exclude=['state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
normal_name = 'this audit name is just for test'
# long_name length exceeds 63 characters
long_name = normal_name + audit_dict['uuid']
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict['name'] = normal_name
response = self.post_json('/audits', audit_dict)
@@ -962,12 +991,10 @@ class TestAuditPolicyEnforcement(api_base.FunctionalTest):
'op': 'replace'}], expect_errors=True)
def test_policy_disallow_create(self):
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
self._common_policy_check(
"audit:create", self.post_json, '/audits', audit_dict,
expect_errors=True)