Files
watcher/watcher/tests/api/v1/test_audits.py
Alfredo Moralejo 891119470c Add a unit test to check the error when creating an audit with wrong parameters
Currently, it is returning http error code 500 instead of 400, which
would be the appropiate code.

A follow-up patch will be sent with the vix and switching the error code
and message.

Related-Bug: #2110538
Change-Id: I35ccbb9cf29fc08e78c4d5f626a6518062efbed3
2025-05-14 17:01:59 +02:00

1155 lines
50 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from dateutil import tz
import itertools
from unittest import mock
from urllib import parse as urlparse
from http import HTTPStatus
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from wsme import types as wtypes
from watcher.api.controllers.v1 import audit as api_audit
from watcher.common import utils
from watcher.db import api as db_api
from watcher.decision_engine import rpcapi as deapi
from watcher import objects
from watcher.tests.api import base as api_base
from watcher.tests.api import utils as api_utils
from watcher.tests import base
from watcher.tests.db import utils as db_utils
from watcher.tests.objects import utils as obj_utils
def post_get_test_audit(**kw):
audit = api_utils.audit_post_data(**kw)
audit_template = db_utils.get_test_audit_template()
goal = db_utils.get_test_goal()
del_keys = ['goal_id', 'strategy_id']
del_keys.extend(kw.get('params_to_exclude', []))
add_keys = {'audit_template_uuid': audit_template['uuid'],
'goal': goal['uuid'],
}
if kw.get('use_named_goal'):
add_keys['goal'] = 'TEST'
for k in add_keys:
audit[k] = kw.get(k, add_keys[k])
for k in del_keys:
del audit[k]
return audit
def post_get_test_audit_with_predefined_strategy(**kw):
spec = kw.pop('strategy_parameters_spec', {})
strategy_id = 2
strategy = db_utils.get_test_strategy(parameters_spec=spec, id=strategy_id)
audit = api_utils.audit_post_data(**kw)
audit_template = db_utils.get_test_audit_template(
strategy_id=strategy['id'])
del_keys = ['goal_id', 'strategy_id']
add_keys = {'audit_template_uuid': audit_template['uuid'],
}
for k in del_keys:
del audit[k]
for k in add_keys:
audit[k] = kw.get(k, add_keys[k])
return audit
class TestAuditObject(base.TestCase):
def test_audit_init(self):
audit_dict = api_utils.audit_post_data(audit_template_id=None,
goal_id=None,
strategy_id=None)
del audit_dict['state']
audit = api_audit.Audit(**audit_dict)
self.assertEqual(wtypes.Unset, audit.state)
class TestListAudit(api_base.FunctionalTest):
def setUp(self):
super(TestListAudit, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
def test_empty(self):
response = self.get_json('/audits')
self.assertEqual([], response['audits'])
def _assert_audit_fields(self, audit):
audit_fields = ['audit_type', 'scope', 'state', 'goal_uuid',
'strategy_uuid']
for field in audit_fields:
self.assertIn(field, audit)
def test_one(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits')
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
def test_one_soft_deleted(self):
audit = obj_utils.create_test_audit(self.context)
audit.soft_delete()
response = self.get_json('/audits',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
response = self.get_json('/audits')
self.assertEqual([], response['audits'])
def test_get_one(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/%s' % audit['uuid'])
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
def test_get_one_soft_deleted(self):
audit = obj_utils.create_test_audit(self.context)
audit.soft_delete()
response = self.get_json('/audits/%s' % audit['uuid'],
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
response = self.get_json('/audits/%s' % audit['uuid'],
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
def test_detail(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/detail')
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
def test_detail_soft_deleted(self):
audit = obj_utils.create_test_audit(self.context)
audit.soft_delete()
response = self.get_json('/audits/detail',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
response = self.get_json('/audits/detail')
self.assertEqual([], response['audits'])
def test_detail_against_single(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/%s/detail' % audit['uuid'],
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
def test_many(self):
audit_list = []
for id_ in range(5):
audit = obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(), name='My Audit {0}'.format(id_))
audit_list.append(audit.uuid)
response = self.get_json('/audits')
self.assertEqual(len(audit_list), len(response['audits']))
uuids = [s['uuid'] for s in response['audits']]
self.assertEqual(sorted(audit_list), sorted(uuids))
def test_many_without_soft_deleted(self):
audit_list = []
for id_ in [1, 2, 3]:
audit = obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(), name='My Audit {0}'.format(id_))
audit_list.append(audit.uuid)
for id_ in [4, 5]:
audit = obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(), name='My Audit {0}'.format(id_))
audit.soft_delete()
response = self.get_json('/audits')
self.assertEqual(3, len(response['audits']))
uuids = [s['uuid'] for s in response['audits']]
self.assertEqual(sorted(audit_list), sorted(uuids))
def test_many_with_soft_deleted(self):
audit_list = []
for id_ in [1, 2, 3]:
audit = obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(), name='My Audit {0}'.format(id_))
audit_list.append(audit.uuid)
for id_ in [4, 5]:
audit = obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(), name='My Audit {0}'.format(id_))
audit.soft_delete()
audit_list.append(audit.uuid)
response = self.get_json('/audits',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(5, len(response['audits']))
uuids = [s['uuid'] for s in response['audits']]
self.assertEqual(sorted(audit_list), sorted(uuids))
def test_many_with_sort_key_goal_uuid(self):
goal_list = []
for id_ in range(5):
goal = obj_utils.create_test_goal(
self.context,
name='gl{0}'.format(id_),
uuid=utils.generate_uuid())
obj_utils.create_test_audit(
self.context, id=id_, uuid=utils.generate_uuid(),
goal_id=goal.id, name='My Audit {0}'.format(id_))
goal_list.append(goal.uuid)
response = self.get_json('/audits/?sort_key=goal_uuid')
self.assertEqual(5, len(response['audits']))
uuids = [s['goal_uuid'] for s in response['audits']]
self.assertEqual(sorted(goal_list), uuids)
def test_sort_key_validation(self):
response = self.get_json(
'/audits?sort_key=%s' % 'bad_name',
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_audit(
self.context, id=1, uuid=uuid,
name='My Audit {0}'.format(1))
response = self.get_json('/audits/%s' % uuid)
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
for link in response['links']:
bookmark = link['rel'] == 'bookmark'
self.assertTrue(
self.validate_link(link['href'], bookmark=bookmark))
def test_collection_links(self):
for id_ in range(5):
obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(), name='My Audit {0}'.format(id_))
response = self.get_json('/audits/?limit=3')
self.assertEqual(3, len(response['audits']))
next_marker = response['audits'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_collection_links_default_limit(self):
cfg.CONF.set_override('max_limit', 3, 'api')
for id_ in range(5):
obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(), name='My Audit {0}'.format(id_))
response = self.get_json('/audits')
self.assertEqual(3, len(response['audits']))
next_marker = response['audits'][-1]['uuid']
self.assertIn(next_marker, response['next'])
class TestPatch(api_base.FunctionalTest):
def setUp(self):
super(TestPatch, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context)
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_state = objects.audit.State.CANCELLED
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
def test_replace_non_existent_audit(self):
response = self.patch_json(
'/audits/%s' % utils.generate_uuid(),
[{'path': '/state', 'value': objects.audit.State.SUCCEEDED,
'op': 'replace'}], expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_add_ok(self):
new_state = objects.audit.State.SUCCEEDED
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': new_state, 'op': 'add'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_int)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(new_state, response['state'])
def test_add_non_existent_property(self):
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertTrue(response.json['error_message'])
def test_remove_ok(self):
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertIsNotNone(response['interval'])
response = self.patch_json('/audits/%s' % self.audit.uuid,
[{'path': '/interval', 'op': 'remove'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertIsNone(response['interval'])
def test_remove_uuid(self):
response = self.patch_json('/audits/%s' % self.audit.uuid,
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_remove_non_existent_property(self):
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
ALLOWED_TRANSITIONS = [
{"original_state": key, "new_state": value}
for key, values in (
objects.audit.AuditStateTransitionManager.TRANSITIONS.items())
for value in values]
class TestPatchStateTransitionDenied(api_base.FunctionalTest):
STATES = [
ap_state for ap_state in objects.audit.State.__dict__
if not ap_state.startswith("_")
]
scenarios = [
(
"%s -> %s" % (original_state, new_state),
{"original_state": original_state,
"new_state": new_state},
)
for original_state, new_state
in list(itertools.product(STATES, STATES))
if original_state != new_state and
{"original_state": original_state,
"new_state": new_state} not in ALLOWED_TRANSITIONS
]
def setUp(self):
super(TestPatchStateTransitionDenied, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context,
state=self.original_state)
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
def test_replace_denied(self):
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertNotEqual(self.new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': self.new_state,
'op': 'replace'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
self.assertTrue(response.json['error_message'])
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(self.original_state, response['state'])
class TestPatchStateTransitionOk(api_base.FunctionalTest):
scenarios = [
(
"%s -> %s" % (transition["original_state"],
transition["new_state"]),
transition
)
for transition in ALLOWED_TRANSITIONS
]
def setUp(self):
super(TestPatchStateTransitionOk, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context,
state=self.original_state)
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertNotEqual(self.new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': self.new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(self.new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
class TestPost(api_base.FunctionalTest):
def setUp(self):
super(TestPost, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
p = mock.patch.object(db_api.BaseConnection, 'create_audit')
self.mock_create_audit = p.start()
self.mock_create_audit.side_effect = (
self._simulate_rpc_audit_create)
self.addCleanup(p.stop)
def _simulate_rpc_audit_create(self, audit):
audit.create()
return audit
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit(self, mock_utcnow, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
# Check location header
self.assertIsNotNone(response.location)
expected_location = '/v1/audits/%s' % response.json['uuid']
self.assertEqual(urlparse.urlparse(response.location).path,
expected_location)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertNotIn('updated_at', response.json.keys)
self.assertNotIn('deleted_at', response.json.keys)
return_created_at = timeutils.parse_isotime(
response.json['created_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_created_at)
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_with_state_not_allowed(self, mock_utcnow,
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(state=objects.audit.State.SUCCEEDED)
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_with_at_uuid_and_goal_specified(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname'])
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_goal(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_goal_without_strategy(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid', 'strategy'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_named_goal(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname',
'audit_template_uuid'],
use_named_goal=True)
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_invalid_audit_template_uuid(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
# Make the audit template UUID some garbage value
audit_dict['audit_template_uuid'] = (
'01234567-8910-1112-1314-151617181920')
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual("application/json", response.content_type)
expected_error_msg = ('The audit template UUID or name specified is '
'invalid')
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_doesnt_contain_id(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
state = audit_dict['state']
del audit_dict['state']
with mock.patch.object(self.dbapi, 'create_audit',
wraps=self.dbapi.create_audit) as cn_mock:
response = self.post_json('/audits', audit_dict)
self.assertEqual(state, response.json['state'])
cn_mock.assert_called_once_with(mock.ANY)
# Check that 'id' is not in first arg of positional args
self.assertNotIn('id', cn_mock.call_args[0][0])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_generate_uuid(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_interval(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '1200'
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertEqual(audit_dict['interval'], response.json['interval'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_cron_interval(self,
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '* * * * *'
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertEqual(audit_dict['interval'], response.json['interval'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_wrong_interval(self,
mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = 'zxc'
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.INTERNAL_SERVER_ERROR, response.status_int)
# NOTE(dviroel): this error message check was shortened to try avoid
# future breakages. See bug #2089866 for more details.
expected_error_msg = ('columns has to be specified for iterator '
'expression.')
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_without_period(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
expected_error_msg = ('Interval of audit must be specified '
'for CONTINUOUS.')
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_oneshot_audit_with_period(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['audit_type'] = objects.audit.AuditType.ONESHOT.value
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
expected_error_msg = 'Interval of audit must not be set for ONESHOT.'
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
def test_create_audit_trigger_decision_engine(self):
with mock.patch.object(deapi.DecisionEngineAPI,
'trigger_audit') as de_mock:
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict)
de_mock.assert_called_once_with(mock.ANY, response.json['uuid'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_uuid(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(state=objects.audit.State.PENDING)
del audit_dict['scope']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
assert not mock_trigger_audit.called
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_parameters_no_predefined_strategy(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
parameters={'name': 'Tom'},
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
expected_error_msg = ('Specify parameters but no predefined '
'strategy for audit, or no '
'parameter spec in predefined strategy')
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
assert not mock_trigger_audit.called
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_parameters_no_schema(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit_with_predefined_strategy(
parameters={'name': 'Tom'})
del audit_dict['uuid']
del audit_dict['state']
del audit_dict['interval']
del audit_dict['scope']
del audit_dict['next_run_time']
del audit_dict['hostname']
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
expected_error_msg = ('Specify parameters but no predefined '
'strategy for audit, or no '
'parameter spec in predefined strategy')
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
assert not mock_trigger_audit.called
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_parameter_not_allowed(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_template = self.prepare_audit_template_strategy_with_parameter()
audit_dict = api_utils.audit_post_data(
parameters={'fake1': 1, 'fake2': "hello"})
audit_dict['audit_template_uuid'] = audit_template['uuid']
del_keys = ['uuid', 'goal_id', 'strategy_id', 'state', 'interval',
'scope', 'next_run_time', 'hostname']
for k in del_keys:
del audit_dict[k]
response = self.post_json('/audits', audit_dict, expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual("application/json", response.content_type)
expected_error_msg = 'Audit parameter fake2 are not allowed'
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
assert not mock_trigger_audit.called
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_missing_parameter(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_template = self.prepare_audit_template_strategy_with_parameter()
audit_dict = api_utils.audit_post_data(
parameters={})
audit_dict['audit_template_uuid'] = audit_template['uuid']
del_keys = ['uuid', 'goal_id', 'strategy_id', 'state', 'interval',
'scope', 'next_run_time', 'hostname']
for k in del_keys:
del audit_dict[k]
response = self.post_json('/audits', audit_dict, expect_errors=True)
# (amoralej) This should return HTTPStatus.BAD_REQUEST, however this
# review is adding the test to show wrong code is returned. I will
# switch this to be HTTPStatus.BAD_REQUEST in the fixing review.
self.assertEqual(HTTPStatus.INTERNAL_SERVER_ERROR, response.status_int)
self.assertEqual("application/json", response.content_type)
# (amoralej) uncomment with the fix
# expected_error_msg = (
# "Invalid parameters for strategy: 'fake1' is a required property")
# self.assertTrue(response.json['error_message'])
# self.assertIn(expected_error_msg, response.json['error_message'])
assert not mock_trigger_audit.called
def prepare_audit_template_strategy_with_parameter(self):
fake_spec = {
"properties": {
"fake1": {
"description": "number parameter example",
"type": "number",
"minimum": 1.0,
"maximum": 10.2,
}
},
'required': ['fake1']
}
template_uuid = 'e74c40e0-d825-11e2-a28f-0800200c9a67'
strategy_uuid = 'e74c40e0-d825-11e2-a28f-0800200c9a68'
template_name = 'my template'
strategy_name = 'my strategy'
strategy_id = 3
strategy = db_utils.get_test_strategy(parameters_spec=fake_spec,
id=strategy_id,
uuid=strategy_uuid,
name=strategy_name)
obj_utils.create_test_strategy(self.context,
parameters_spec=fake_spec,
id=strategy_id,
uuid=strategy_uuid,
name=strategy_name)
obj_utils.create_test_audit_template(self.context,
strategy_id=strategy_id,
uuid=template_uuid,
name='name')
audit_template = db_utils.get_test_audit_template(
strategy_id=strategy['id'], uuid=template_uuid, name=template_name)
return audit_template
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_with_name(self, mock_utcnow, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit(
params_to_exclude=['state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
normal_name = 'this audit name is just for test'
# long_name length exceeds 63 characters
long_name = normal_name + audit_dict['uuid']
del audit_dict['uuid']
audit_dict['name'] = normal_name
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(normal_name, response.json['name'])
audit_dict['name'] = long_name
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertNotEqual(long_name, response.json['name'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_start_end_time(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
start_time = datetime.datetime(2018, 3, 1, 0, 0)
end_time = datetime.datetime(2018, 4, 1, 0, 0)
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal']
)
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '1200'
audit_dict['start_time'] = str(start_time)
audit_dict['end_time'] = str(end_time)
response = self.post_json(
'/audits',
audit_dict,
headers={'OpenStack-API-Version': 'infra-optim 1.1'})
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(objects.audit.State.PENDING,
response.json['state'])
self.assertEqual(audit_dict['interval'], response.json['interval'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
return_start_time = timeutils.parse_isotime(
response.json['start_time'])
return_end_time = timeutils.parse_isotime(
response.json['end_time'])
iso_start_time = start_time.replace(
tzinfo=tz.tzlocal()).astimezone(tz.tzutc())
iso_end_time = end_time.replace(
tzinfo=tz.tzlocal()).astimezone(tz.tzutc())
self.assertEqual(iso_start_time, return_start_time)
self.assertEqual(iso_end_time, return_end_time)
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_continuous_audit_with_start_end_time_incompatible_version(
self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
start_time = datetime.datetime(2018, 3, 1, 0, 0)
end_time = datetime.datetime(2018, 4, 1, 0, 0)
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal']
)
audit_dict['audit_type'] = objects.audit.AuditType.CONTINUOUS.value
audit_dict['interval'] = '1200'
audit_dict['start_time'] = str(start_time)
audit_dict['end_time'] = str(end_time)
response = self.post_json(
'/audits',
audit_dict,
headers={'OpenStack-API-Version': 'infra-optim 1.0'},
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.NOT_ACCEPTABLE, response.status_int)
expected_error_msg = 'Request not acceptable.'
self.assertTrue(response.json['error_message'])
self.assertIn(expected_error_msg, response.json['error_message'])
assert not mock_trigger_audit.called
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_force_false(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
response = self.post_json(
'/audits',
audit_dict,
headers={'OpenStack-API-Version': 'infra-optim 1.2'})
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertFalse(response.json['force'])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_with_force_true(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(
params_to_exclude=['uuid', 'state', 'interval', 'scope',
'next_run_time', 'hostname', 'goal'])
audit_dict['force'] = True
response = self.post_json(
'/audits',
audit_dict,
headers={'OpenStack-API-Version': 'infra-optim 1.2'})
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertTrue(response.json['force'])
class TestDelete(api_base.FunctionalTest):
def setUp(self):
super(TestDelete, self).setUp()
obj_utils.create_test_goal(self.context)
obj_utils.create_test_strategy(self.context)
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context)
p = mock.patch.object(db_api.BaseConnection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
@mock.patch('oslo_utils.timeutils.utcnow')
def test_delete_audit(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_state = objects.audit.State.ONGOING
self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
response = self.delete('/audits/%s' % self.audit.uuid,
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
new_state = objects.audit.State.CANCELLED
self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.delete('/audits/%s' % self.audit.uuid)
response = self.get_json('/audits/%s' % self.audit.uuid,
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
self.context.show_deleted = True
audit = objects.Audit.get_by_uuid(self.context, self.audit.uuid)
return_deleted_at = \
audit['deleted_at'].strftime('%Y-%m-%dT%H:%M:%S.%f')
self.assertEqual(test_time.strftime('%Y-%m-%dT%H:%M:%S.%f'),
return_deleted_at)
self.assertEqual(objects.audit.State.DELETED, audit['state'])
def test_delete_audit_not_found(self):
uuid = utils.generate_uuid()
response = self.delete('/audits/%s' % uuid, expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
class TestAuditPolicyEnforcement(api_base.FunctionalTest):
def setUp(self):
super(TestAuditPolicyEnforcement, self).setUp()
obj_utils.create_test_goal(self.context)
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:default"})
response = func(*arg, **kwarg)
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"audit:get_all", self.get_json, '/audits',
expect_errors=True)
def test_policy_disallow_get_one(self):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:get", self.get_json,
'/audits/%s' % audit.uuid,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"audit:detail", self.get_json,
'/audits/detail',
expect_errors=True)
def test_policy_disallow_update(self):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:update", self.patch_json,
'/audits/%s' % audit.uuid,
[{'path': '/state', 'value': objects.audit.State.SUCCEEDED,
'op': 'replace'}], expect_errors=True)
def test_policy_disallow_create(self):
audit_dict = post_get_test_audit(
state=objects.audit.State.PENDING,
params_to_exclude=['uuid', 'state', 'scope',
'next_run_time', 'hostname', 'goal'])
self._common_policy_check(
"audit:create", self.post_json, '/audits', audit_dict,
expect_errors=True)
def test_policy_disallow_delete(self):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:delete", self.delete,
'/audits/%s' % audit.uuid, expect_errors=True)
class TestAuditEnforcementWithAdminContext(TestListAudit,
api_base.AdminRoleTest):
def setUp(self):
super(TestAuditEnforcementWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"audit:create": "rule:default",
"audit:delete": "rule:default",
"audit:detail": "rule:default",
"audit:get": "rule:default",
"audit:get_all": "rule:default",
"audit:update": "rule:default"})