New cron type for audit interval

This patch set adds cron supporting for audit.

Implements: blueprint cron-based-continuous-audits
Change-Id: I8570bebb13332dfba80185e912aeda45b6b4cd70
This commit is contained in:
Alexander Chadin
2017-04-21 18:19:46 +03:00
parent 35fdbbe16e
commit 0b44492da7
20 changed files with 326 additions and 54 deletions

View File

@@ -481,6 +481,7 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
@@ -523,6 +524,7 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
# Make the audit template UUID some garbage value
audit_dict['audit_template_uuid'] = (
'01234567-8910-1112-1314-151617181920')
@@ -545,6 +547,7 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
with mock.patch.object(self.dbapi, 'create_audit',
wraps=self.dbapi.create_audit) as cn_mock:
response = self.post_json('/audits', audit_dict)
@@ -562,6 +565,7 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
@@ -571,15 +575,16 @@ class TestPost(api_base.FunctionalTest):
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_period(self, mock_trigger_audit):
def test_create_continuous_audit_with_interval(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = 1200
audit_dict['interval'] = '1200'
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
@@ -589,6 +594,48 @@ class TestPost(api_base.FunctionalTest):
self.assertEqual(audit_dict['interval'], response.json['interval'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_cron_interval(self,
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '* * * * *'
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertEqual(audit_dict['interval'], response.json['interval'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_wrong_interval(self,
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = 'zxc'
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(500, response.status_int)
expected_error_msg = ('Exactly 5 or 6 columns has to be '
'specified for iteratorexpression.')
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_without_period(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
@@ -599,6 +646,7 @@ class TestPost(api_base.FunctionalTest):
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
@@ -616,8 +664,8 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['uuid']
del audit_dict['state']
audit_dict['audit_type'] = objects.audit.AuditType.ONESHOT.value
audit_dict['interval'] = 1200
del audit_dict['scope']
del audit_dict['next_run_time']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
@@ -634,6 +682,7 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
response = self.post_json('/audits', audit_dict)
de_mock.assert_called_once_with(mock.ANY, response.json['uuid'])
@@ -657,6 +706,7 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
@@ -678,6 +728,7 @@ class TestPost(api_base.FunctionalTest):
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
@@ -700,7 +751,7 @@ class TestPost(api_base.FunctionalTest):
audit_dict['audit_template_uuid'] = audit_template['uuid']
del_keys = ['uuid', 'goal_id', 'strategy_id', 'state', 'interval',
'scope']
'scope', 'next_run_time']
for k in del_keys:
del audit_dict[k]
@@ -839,6 +890,7 @@ class TestAuditPolicyEnforcement(api_base.FunctionalTest):
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['scope']
del audit_dict['next_run_time']
self._common_policy_check(
"audit:create", self.post_json, '/audits', audit_dict,
expect_errors=True)