initial version

Change-Id: I699e0ab082657880998d8618fe29eb7f56c6c661
This commit is contained in:
David TARDIVEL
2015-06-04 15:26:55 +02:00
parent 073c6e49cb
commit d14e057da1
316 changed files with 27260 additions and 0 deletions

View File

243
watcher/tests/api/base.py Normal file
View File

@@ -0,0 +1,243 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base classes for API tests."""
# NOTE: Ported from ceilometer/tests/api.py (subsequently moved to
# ceilometer/tests/api/__init__.py). This should be oslo'ified:
# https://bugs.launchpad.net/watcher/+bug/1255115.
# NOTE(deva): import auth_token so we can override a config option
from keystonemiddleware import auth_token # noqa
# import mock
from oslo_config import cfg
import pecan
import pecan.testing
from six.moves.urllib import parse as urlparse
from watcher.api import hooks
from watcher.tests.db import base
PATH_PREFIX = '/v1'
class FunctionalTest(base.DbTestCase):
"""Pecan controller functional testing class.
Used for functional tests of Pecan controllers where you need to
test your literal application and its integration with the
framework.
"""
SOURCE_DATA = {'test_source': {'somekey': '666'}}
def setUp(self):
super(FunctionalTest, self).setUp()
cfg.CONF.set_override("auth_version", "v2.0",
group='keystone_authtoken')
cfg.CONF.set_override("admin_user", "admin",
group='keystone_authtoken')
self.app = self._make_app()
def reset_pecan():
pecan.set_config({}, overwrite=True)
self.addCleanup(reset_pecan)
def _make_app(self, enable_acl=False):
# Determine where we are so we can set up paths in the config
root_dir = self.path_get()
self.config = {
'app': {
'root': 'watcher.api.controllers.root.RootController',
'modules': ['watcher.api'],
'hooks': [
hooks.ContextHook(),
hooks.NoExceptionTracebackHook()
],
'static_root': '%s/public' % root_dir,
'template_path': '%s/api/templates' % root_dir,
'enable_acl': enable_acl,
'acl_public_routes': ['/', '/v1'],
},
}
return pecan.testing.load_test_app(self.config)
def _request_json(self, path, params, expect_errors=False, headers=None,
method="post", extra_environ=None, status=None,
path_prefix=PATH_PREFIX):
"""Sends simulated HTTP request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param method: Request method type. Appropriate method function call
should be used rather than passing attribute in.
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
:param path_prefix: prefix of the url path
"""
full_path = path_prefix + path
print('%s: %s %s' % (method.upper(), full_path, params))
response = getattr(self.app, "%s_json" % method)(
str(full_path),
params=params,
headers=headers,
status=status,
extra_environ=extra_environ,
expect_errors=expect_errors
)
print('GOT:%s' % response)
return response
def put_json(self, path, params, expect_errors=False, headers=None,
extra_environ=None, status=None):
"""Sends simulated HTTP PUT request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
"""
return self._request_json(path=path, params=params,
expect_errors=expect_errors,
headers=headers, extra_environ=extra_environ,
status=status, method="put")
def post_json(self, path, params, expect_errors=False, headers=None,
extra_environ=None, status=None):
"""Sends simulated HTTP POST request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
"""
return self._request_json(path=path, params=params,
expect_errors=expect_errors,
headers=headers, extra_environ=extra_environ,
status=status, method="post")
def patch_json(self, path, params, expect_errors=False, headers=None,
extra_environ=None, status=None):
"""Sends simulated HTTP PATCH request to Pecan test app.
:param path: url path of target service
:param params: content for wsgi.input of request
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
"""
return self._request_json(path=path, params=params,
expect_errors=expect_errors,
headers=headers, extra_environ=extra_environ,
status=status, method="patch")
def delete(self, path, expect_errors=False, headers=None,
extra_environ=None, status=None, path_prefix=PATH_PREFIX):
"""Sends simulated HTTP DELETE request to Pecan test app.
:param path: url path of target service
:param expect_errors: Boolean value; whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param status: expected status code of response
:param path_prefix: prefix of the url path
"""
full_path = path_prefix + path
print('DELETE: %s' % (full_path))
response = self.app.delete(str(full_path),
headers=headers,
status=status,
extra_environ=extra_environ,
expect_errors=expect_errors)
print('GOT:%s' % response)
return response
def get_json(self, path, expect_errors=False, headers=None,
extra_environ=None, q=[], path_prefix=PATH_PREFIX, **params):
"""Sends simulated HTTP GET request to Pecan test app.
:param path: url path of target service
:param expect_errors: Boolean value;whether an error is expected based
on request
:param headers: a dictionary of headers to send along with the request
:param extra_environ: a dictionary of environ variables to send along
with the request
:param q: list of queries consisting of: field, value, op, and type
keys
:param path_prefix: prefix of the url path
:param params: content for wsgi.input of request
"""
full_path = path_prefix + path
query_params = {'q.field': [],
'q.value': [],
'q.op': [],
}
for query in q:
for name in ['field', 'op', 'value']:
query_params['q.%s' % name].append(query.get(name, ''))
all_params = {}
all_params.update(params)
if q:
all_params.update(query_params)
print('GET: %s %r' % (full_path, all_params))
response = self.app.get(full_path,
params=all_params,
headers=headers,
extra_environ=extra_environ,
expect_errors=expect_errors)
if not expect_errors:
response = response.json
print('GOT:%s' % response)
return response
def validate_link(self, link, bookmark=False):
"""Checks if the given link can get correct data."""
# removes the scheme and net location parts of the link
url_parts = list(urlparse.urlparse(link))
url_parts[0] = url_parts[1] = ''
# bookmark link should not have the version in the URL
if bookmark and url_parts[2].startswith(PATH_PREFIX):
return False
full_path = urlparse.urlunparse(url_parts)
try:
self.get_json(full_path, path_prefix='')
return True
except Exception:
return False

View File

@@ -0,0 +1,30 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcher.tests.api import base
class TestBase(base.FunctionalTest):
def test_api_setup(self):
pass
def test_bad_uri(self):
response = self.get_json('/bad/path',
expect_errors=True,
headers={"Accept": "application/json"})
self.assertEqual(404, response.status_int)
self.assertEqual("application/json", response.content_type)
self.assertTrue(response.json['error_message'])

View File

@@ -0,0 +1,140 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
from oslo_config import cfg
import oslo_messaging as messaging
from watcher.api.controllers import root
from watcher.api import hooks
from watcher.common import context as watcher_context
from watcher.tests.api import base as api_base
from watcher.tests import base
from watcher.tests import fakes
class TestContextHook(base.BaseTestCase):
def setUp(self):
super(TestContextHook, self).setUp()
self.app = fakes.FakeApp()
def test_context_hook_before_method(self):
state = mock.Mock(request=fakes.FakePecanRequest())
hook = hooks.ContextHook()
hook.before(state)
ctx = state.request.context
self.assertIsInstance(ctx, watcher_context.RequestContext)
self.assertEqual(ctx.auth_token,
fakes.fakeAuthTokenHeaders['X-Auth-Token'])
self.assertEqual(ctx.project_id,
fakes.fakeAuthTokenHeaders['X-Project-Id'])
self.assertEqual(ctx.user_id,
fakes.fakeAuthTokenHeaders['X-User-Id'])
self.assertEqual(ctx.auth_url,
fakes.fakeAuthTokenHeaders['X-Auth-Url'])
self.assertEqual(ctx.domain_name,
fakes.fakeAuthTokenHeaders['X-User-Domain-Name'])
self.assertEqual(ctx.domain_id,
fakes.fakeAuthTokenHeaders['X-User-Domain-Id'])
self.assertIsNone(ctx.auth_token_info)
def test_context_hook_before_method_auth_info(self):
state = mock.Mock(request=fakes.FakePecanRequest())
state.request.environ['keystone.token_info'] = 'assert_this'
hook = hooks.ContextHook()
hook.before(state)
ctx = state.request.context
self.assertIsInstance(ctx, watcher_context.RequestContext)
self.assertEqual(fakes.fakeAuthTokenHeaders['X-Auth-Token'],
ctx.auth_token)
self.assertEqual('assert_this', ctx.auth_token_info)
class TestNoExceptionTracebackHook(api_base.FunctionalTest):
TRACE = [
u'Traceback (most recent call last):',
u' File "/opt/stack/watcher/watcher/openstack/common/rpc/amqp.py",'
' line 434, in _process_data\\n **args)',
u' File "/opt/stack/watcher/watcher/openstack/common/rpc/'
'dispatcher.py", line 172, in dispatch\\n result ='
' getattr(proxyobj, method)(context, **kwargs)']
MSG_WITHOUT_TRACE = "Test exception message."
MSG_WITH_TRACE = MSG_WITHOUT_TRACE + "\n" + "\n".join(TRACE)
def setUp(self):
super(TestNoExceptionTracebackHook, self).setUp()
p = mock.patch.object(root.Root, 'convert')
self.root_convert_mock = p.start()
self.addCleanup(p.stop)
def test_hook_exception_success(self):
self.root_convert_mock.side_effect = Exception(self.MSG_WITH_TRACE)
response = self.get_json('/', path_prefix='', expect_errors=True)
actual_msg = json.loads(response.json['error_message'])['faultstring']
self.assertEqual(self.MSG_WITHOUT_TRACE, actual_msg)
def test_hook_remote_error_success(self):
test_exc_type = 'TestException'
self.root_convert_mock.side_effect = messaging.rpc.RemoteError(
test_exc_type, self.MSG_WITHOUT_TRACE, self.TRACE)
response = self.get_json('/', path_prefix='', expect_errors=True)
# NOTE(max_lobur): For RemoteError the client message will still have
# some garbage because in RemoteError traceback is serialized as a list
# instead of'\n'.join(trace). But since RemoteError is kind of very
# rare thing (happens due to wrong deserialization settings etc.)
# we don't care about this garbage.
expected_msg = ("Remote error: %s %s"
% (test_exc_type, self.MSG_WITHOUT_TRACE) + "\n[u'")
actual_msg = json.loads(response.json['error_message'])['faultstring']
self.assertEqual(expected_msg, actual_msg)
def test_hook_without_traceback(self):
msg = "Error message without traceback \n but \n multiline"
self.root_convert_mock.side_effect = Exception(msg)
response = self.get_json('/', path_prefix='', expect_errors=True)
actual_msg = json.loads(response.json['error_message'])['faultstring']
self.assertEqual(msg, actual_msg)
def test_hook_server_debug_on_serverfault(self):
cfg.CONF.set_override('debug', True)
self.root_convert_mock.side_effect = Exception(self.MSG_WITH_TRACE)
response = self.get_json('/', path_prefix='', expect_errors=True)
actual_msg = json.loads(
response.json['error_message'])['faultstring']
self.assertEqual(self.MSG_WITHOUT_TRACE, actual_msg)
def test_hook_server_debug_on_clientfault(self):
cfg.CONF.set_override('debug', True)
client_error = Exception(self.MSG_WITH_TRACE)
client_error.code = 400
self.root_convert_mock.side_effect = client_error
response = self.get_json('/', path_prefix='', expect_errors=True)
actual_msg = json.loads(
response.json['error_message'])['faultstring']
self.assertEqual(self.MSG_WITH_TRACE, actual_msg)

View File

@@ -0,0 +1,44 @@
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcher.tests.api import base
class TestRoot(base.FunctionalTest):
def test_get_root(self):
data = self.get_json('/', path_prefix='')
self.assertEqual('v1', data['default_version']['id'])
# Check fields are not empty
[self.assertNotIn(f, ['', []]) for f in data.keys()]
class TestV1Root(base.FunctionalTest):
def test_get_v1_root(self):
data = self.get_json('/')
self.assertEqual('v1', data['id'])
# Check fields are not empty
for f in data.keys():
self.assertNotIn(f, ['', []])
# Check if all known resources are present and there are no extra ones.
not_resources = ('id', 'links', 'media_types')
actual_resources = tuple(set(data.keys()) - set(not_resources))
expected_resources = ('audit_templates', 'audits', 'actions',
'action_plans')
self.assertEqual(sorted(expected_resources), sorted(actual_resources))
self.assertIn({'type': 'application/vnd.openstack.watcher.v1+json',
'base': 'application/json'}, data['media_types'])

103
watcher/tests/api/utils.py Normal file
View File

@@ -0,0 +1,103 @@
# -*- encoding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utils for testing the API service.
"""
import datetime
import json
from watcher.api.controllers.v1 import action as action_ctrl
from watcher.api.controllers.v1 import action_plan as action_plan_ctrl
from watcher.api.controllers.v1 import audit as audit_ctrl
from watcher.api.controllers.v1 import audit_template as audit_template_ctrl
from watcher.tests.db import utils as db_utils
ADMIN_TOKEN = '4562138218392831'
MEMBER_TOKEN = '4562138218392832'
class FakeMemcache(object):
"""Fake cache that is used for keystone tokens lookup."""
_cache = {
'tokens/%s' % ADMIN_TOKEN: {
'access': {
'token': {'id': ADMIN_TOKEN,
'expires': '2100-09-11T00:00:00'},
'user': {'id': 'user_id1',
'name': 'user_name1',
'tenantId': '123i2910',
'tenantName': 'mytenant',
'roles': [{'name': 'admin'}]
},
}
},
'tokens/%s' % MEMBER_TOKEN: {
'access': {
'token': {'id': MEMBER_TOKEN,
'expires': '2100-09-11T00:00:00'},
'user': {'id': 'user_id2',
'name': 'user-good',
'tenantId': 'project-good',
'tenantName': 'goodies',
'roles': [{'name': 'Member'}]
}
}
}
}
def __init__(self):
self.set_key = None
self.set_value = None
self.token_expiration = None
def get(self, key):
dt = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
return json.dumps((self._cache.get(key), dt.isoformat()))
def set(self, key, value, time=0, min_compress_len=0):
self.set_value = value
self.set_key = key
def remove_internal(values, internal):
# NOTE(yuriyz): internal attributes should not be posted, except uuid
int_attr = [attr.lstrip('/') for attr in internal if attr != '/uuid']
return dict([(k, v) for (k, v) in values.iteritems() if k not in int_attr])
def audit_post_data(**kw):
audit = db_utils.get_test_audit(**kw)
internal = audit_ctrl.AuditPatchType.internal_attrs()
return remove_internal(audit, internal)
def audit_template_post_data(**kw):
audit_template = db_utils.get_test_audit_template(**kw)
internal = audit_template_ctrl.AuditTemplatePatchType.internal_attrs()
return remove_internal(audit_template, internal)
def action_post_data(**kw):
action = db_utils.get_test_action(**kw)
internal = action_ctrl.ActionPatchType.internal_attrs()
return remove_internal(action, internal)
def action_plan_post_data(**kw):
act_plan = db_utils.get_test_action_plan(**kw)
internal = action_plan_ctrl.ActionPlanPatchType.internal_attrs()
return remove_internal(act_plan, internal)

View File

View File

@@ -0,0 +1,587 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from wsme import types as wtypes
from watcher.api.controllers.v1 import action as api_action
from watcher.common import utils
from watcher.db import api as db_api
from watcher import objects
from watcher.tests.api import base as api_base
from watcher.tests.api import utils as api_utils
from watcher.tests import base
from watcher.tests.db import utils as db_utils
from watcher.tests.objects import utils as obj_utils
def post_get_test_action(**kw):
action = api_utils.action_post_data(**kw)
action_plan = db_utils.get_test_action_plan()
action['action_plan_id'] = None
action['action_plan_uuid'] = kw.get('action_plan_uuid',
action_plan['uuid'])
action['next'] = None
return action
class TestActionObject(base.TestCase):
def test_action_init(self):
action_dict = api_utils.action_post_data(action_plan_id=None,
next=None)
del action_dict['state']
action = api_action.Action(**action_dict)
self.assertEqual(wtypes.Unset, action.state)
class TestListAction(api_base.FunctionalTest):
def setUp(self):
super(TestListAction, self).setUp()
obj_utils.create_test_action_plan(self.context)
def test_empty(self):
response = self.get_json('/actions')
self.assertEqual([], response['actions'])
def _assert_action_fields(self, action):
action_fields = ['uuid', 'state', 'action_plan_uuid', 'action_type']
for field in action_fields:
self.assertIn(field, action)
def test_one(self):
action = obj_utils.create_test_action(self.context, next=None)
response = self.get_json('/actions')
self.assertEqual(action.uuid, response['actions'][0]["uuid"])
self._assert_action_fields(response['actions'][0])
def test_one_soft_deleted(self):
action = obj_utils.create_test_action(self.context, next=None)
action.soft_delete()
response = self.get_json('/actions',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action.uuid, response['actions'][0]["uuid"])
self._assert_action_fields(response['actions'][0])
response = self.get_json('/actions')
self.assertEqual([], response['actions'])
def test_get_one(self):
action = obj_utils.create_test_action(self.context, next=None)
response = self.get_json('/actions/%s' % action['uuid'])
self.assertEqual(action.uuid, response['uuid'])
self.assertEqual(action.description, response['description'])
self.assertEqual(action.src, response['src'])
self.assertEqual(action.dst, response['dst'])
self.assertEqual(action.action_type, response['action_type'])
self.assertEqual(action.parameter, response['parameter'])
self._assert_action_fields(response)
def test_get_one_soft_deleted(self):
action = obj_utils.create_test_action(self.context, next=None)
action.soft_delete()
response = self.get_json('/actions/%s' % action['uuid'],
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action.uuid, response['uuid'])
self._assert_action_fields(response)
response = self.get_json('/actions/%s' % action['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_detail(self):
action = obj_utils.create_test_action(self.context, next=None)
response = self.get_json('/actions/detail')
self.assertEqual(action.uuid, response['actions'][0]["uuid"])
self._assert_action_fields(response['actions'][0])
def test_detail_soft_deleted(self):
action = obj_utils.create_test_action(self.context, next=None)
action.soft_delete()
response = self.get_json('/actions/detail',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action.uuid, response['actions'][0]["uuid"])
self._assert_action_fields(response['actions'][0])
response = self.get_json('/actions/detail')
self.assertEqual([], response['actions'])
def test_detail_against_single(self):
action = obj_utils.create_test_action(self.context, next=None)
response = self.get_json('/actions/%s/detail' % action['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_many(self):
action_list = []
for id_ in range(5):
action = obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
response = self.get_json('/actions')
self.assertEqual(len(action_list), len(response['actions']))
uuids = [s['uuid'] for s in response['actions']]
self.assertEqual(sorted(action_list), sorted(uuids))
def test_many_with_action_plan_uuid(self):
action_plan = obj_utils.create_test_action_plan(
self.context,
id=2,
uuid=utils.generate_uuid(),
audit_id=1)
action_list = []
for id_ in range(5):
action = obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=2,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
response = self.get_json('/actions')
self.assertEqual(len(action_list), len(response['actions']))
for action in response['actions']:
self.assertEqual(action_plan.uuid, action['action_plan_uuid'])
def test_filter_by_audit_uuid(self):
audit = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
action_plan_1 = obj_utils.create_test_action_plan(
self.context,
uuid=utils.generate_uuid(),
audit_id=audit.id)
action_list = []
for id_ in range(3):
action = obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=action_plan_1.id,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
audit2 = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
action_plan_2 = obj_utils.create_test_action_plan(
self.context,
uuid=utils.generate_uuid(),
audit_id=audit2.id)
for id_ in range(4, 5, 6):
obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=action_plan_2.id,
uuid=utils.generate_uuid())
response = self.get_json('/actions?audit_uuid=%s' % audit.uuid)
self.assertEqual(len(action_list), len(response['actions']))
for action in response['actions']:
self.assertEqual(action_plan_1.uuid, action['action_plan_uuid'])
def test_filter_by_action_plan_uuid(self):
audit = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
action_plan_1 = obj_utils.create_test_action_plan(
self.context,
uuid=utils.generate_uuid(),
audit_id=audit.id)
action_list = []
for id_ in range(3):
action = obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=action_plan_1.id,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
action_plan_2 = obj_utils.create_test_action_plan(
self.context,
uuid=utils.generate_uuid(),
audit_id=audit.id)
for id_ in range(4, 5, 6):
obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=action_plan_2.id,
uuid=utils.generate_uuid())
response = self.get_json(
'/actions?action_plan_uuid=%s' % action_plan_1.uuid)
self.assertEqual(len(action_list), len(response['actions']))
for action in response['actions']:
self.assertEqual(action_plan_1.uuid, action['action_plan_uuid'])
response = self.get_json(
'/actions?action_plan_uuid=%s' % action_plan_2.uuid)
for action in response['actions']:
self.assertEqual(action_plan_2.uuid, action['action_plan_uuid'])
def test_details_and_filter_by_action_plan_uuid(self):
audit = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
action_plan = obj_utils.create_test_action_plan(
self.context,
uuid=utils.generate_uuid(),
audit_id=audit.id)
for id_ in range(3):
action = obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=action_plan.id,
uuid=utils.generate_uuid())
response = self.get_json(
'/actions/detail?action_plan_uuid=%s' % action_plan.uuid)
for action in response['actions']:
self.assertEqual(action_plan.uuid, action['action_plan_uuid'])
def test_details_and_filter_by_audit_uuid(self):
audit = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
action_plan = obj_utils.create_test_action_plan(
self.context,
uuid=utils.generate_uuid(),
audit_id=audit.id)
for id_ in range(3):
action = obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=action_plan.id,
uuid=utils.generate_uuid())
response = self.get_json(
'/actions/detail?audit_uuid=%s' % audit.uuid)
for action in response['actions']:
self.assertEqual(action_plan.uuid, action['action_plan_uuid'])
def test_filter_by_action_plan_and_audit_uuids(self):
audit = obj_utils.create_test_audit(
self.context, uuid=utils.generate_uuid())
action_plan = obj_utils.create_test_action_plan(
self.context,
uuid=utils.generate_uuid(),
audit_id=audit.id)
url = '/actions?action_plan_uuid=%s&audit_uuid=%s' % (
action_plan.uuid, audit.uuid)
response = self.get_json(url, expect_errors=True)
self.assertEqual(400, response.status_int)
def test_many_with_soft_deleted_action_plan_uuid(self):
action_plan1 = obj_utils.create_test_action_plan(
self.context,
id=2,
uuid=utils.generate_uuid(),
audit_id=1)
action_plan2 = obj_utils.create_test_action_plan(
self.context,
id=3,
uuid=utils.generate_uuid(),
audit_id=1)
action_list = []
for id_ in range(0, 2):
action = obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=2,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
for id_ in range(2, 4):
action = obj_utils.create_test_action(
self.context, id=id_,
action_plan_id=3,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
self.delete('/action_plans/%s' % action_plan1.uuid)
response = self.get_json('/actions')
self.assertEqual(len(action_list), len(response['actions']))
for id_ in range(0, 2):
action = response['actions'][id_]
self.assertEqual(None, action['action_plan_uuid'])
for id_ in range(2, 4):
action = response['actions'][id_]
self.assertEqual(action_plan2.uuid, action['action_plan_uuid'])
def test_many_with_next_uuid(self):
action_list = []
for id_ in range(5):
action = obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid(),
next=id_ + 1)
action_list.append(action.uuid)
response = self.get_json('/actions')
response_actions = response['actions']
for id in [0, 1, 2, 3]:
self.assertEqual(response_actions[id]['next_uuid'],
response_actions[id + 1]['uuid'])
def test_many_without_soft_deleted(self):
action_list = []
for id_ in [1, 2, 3]:
action = obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
for id_ in [4, 5]:
action = obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid())
action.soft_delete()
response = self.get_json('/actions')
self.assertEqual(3, len(response['actions']))
uuids = [s['uuid'] for s in response['actions']]
self.assertEqual(sorted(action_list), sorted(uuids))
def test_many_with_soft_deleted(self):
action_list = []
for id_ in [1, 2, 3]:
action = obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid())
action_list.append(action.uuid)
for id_ in [4, 5]:
action = obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid())
action.soft_delete()
action_list.append(action.uuid)
response = self.get_json('/actions',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(5, len(response['actions']))
uuids = [s['uuid'] for s in response['actions']]
self.assertEqual(sorted(action_list), sorted(uuids))
def test_many_with_sort_key_next_uuid(self):
for id_ in range(5):
obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid(),
next=id_ + 1)
response = self.get_json('/actions/')
reference_uuids = [(s['next_uuid'] if 'next_uuid' in s else None)
for s in response['actions']]
response = self.get_json('/actions/?sort_key=next_uuid')
self.assertEqual(5, len(response['actions']))
uuids = [(s['next_uuid'] if 'next_uuid' in s else None)
for s in response['actions']]
self.assertEqual(sorted(reference_uuids), uuids)
response = self.get_json('/actions/?sort_key=next_uuid&sort_dir=desc')
self.assertEqual(5, len(response['actions']))
uuids = [(s['next_uuid'] if 'next_uuid' in s else None)
for s in response['actions']]
self.assertEqual(sorted(reference_uuids, reverse=True), uuids)
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_action(self.context, id=1, uuid=uuid)
response = self.get_json('/actions/%s' % uuid)
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
for l in response['links']:
bookmark = l['rel'] == 'bookmark'
self.assertTrue(self.validate_link(l['href'], bookmark=bookmark))
def test_collection_links(self):
next = -1
for id_ in range(5):
action = obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid(),
next=next)
next = action.id
response = self.get_json('/actions/?limit=3')
self.assertEqual(3, len(response['actions']))
next_marker = response['actions'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_collection_links_default_limit(self):
cfg.CONF.set_override('max_limit', 3, 'api')
for id_ in range(5):
obj_utils.create_test_action(self.context, id=id_,
uuid=utils.generate_uuid())
response = self.get_json('/actions')
self.assertEqual(3, len(response['actions']))
next_marker = response['actions'][-1]['uuid']
self.assertIn(next_marker, response['next'])
class TestPatch(api_base.FunctionalTest):
def setUp(self):
super(TestPatch, self).setUp()
obj_utils.create_test_action_plan(self.context)
self.action = obj_utils.create_test_action(self.context, next=None)
p = mock.patch.object(db_api.Connection, 'update_action')
self.mock_action_update = p.start()
self.mock_action_update.side_effect = self._simulate_rpc_action_update
self.addCleanup(p.stop)
def _simulate_rpc_action_update(self, action):
action.save()
return action
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_state = 'SUBMITTED'
response = self.get_json('/actions/%s' % self.action.uuid)
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/actions/%s' % self.action.uuid,
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json('/actions/%s' % self.action.uuid)
self.assertEqual(new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
def test_replace_non_existent_action(self):
response = self.patch_json('/actions/%s' % utils.generate_uuid(),
[{'path': '/state', 'value': 'SUBMITTED',
'op': 'replace'}],
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_add_ok(self):
new_state = 'SUCCESS'
response = self.patch_json(
'/actions/%s' % self.action.uuid,
[{'path': '/state', 'value': new_state, 'op': 'add'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_int)
response = self.get_json('/actions/%s' % self.action.uuid)
self.assertEqual(new_state, response['state'])
def test_add_non_existent_property(self):
response = self.patch_json(
'/actions/%s' % self.action.uuid,
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
self.assertTrue(response.json['error_message'])
def test_remove_ok(self):
response = self.get_json('/actions/%s' % self.action.uuid)
self.assertIsNotNone(response['state'])
response = self.patch_json('/actions/%s' % self.action.uuid,
[{'path': '/state', 'op': 'remove'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json('/actions/%s' % self.action.uuid)
self.assertIsNone(response['state'])
def test_remove_uuid(self):
response = self.patch_json('/actions/%s' % self.action.uuid,
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_remove_non_existent_property(self):
response = self.patch_json(
'/actions/%s' % self.action.uuid,
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
# class TestDelete(api_base.FunctionalTest):
# def setUp(self):
# super(TestDelete, self).setUp()
# self.action = obj_utils.create_test_action(self.context, next=None)
# p = mock.patch.object(db_api.Connection, 'destroy_action')
# self.mock_action_delete = p.start()
# self.mock_action_delete.side_effect =
# self._simulate_rpc_action_delete
# self.addCleanup(p.stop)
# def _simulate_rpc_action_delete(self, action_uuid):
# action = objects.Action.get_by_uuid(self.context, action_uuid)
# action.destroy()
# def test_delete_action(self):
# self.delete('/actions/%s' % self.action.uuid)
# response = self.get_json('/actions/%s' % self.action.uuid,
# expect_errors=True)
# self.assertEqual(404, response.status_int)
# self.assertEqual('application/json', response.content_type)
# self.assertTrue(response.json['error_message'])
# def test_delete_action_not_found(self):
# uuid = utils.generate_uuid()
# response = self.delete('/actions/%s' % uuid, expect_errors=True)
# self.assertEqual(404, response.status_int)
# self.assertEqual('application/json', response.content_type)
# self.assertTrue(response.json['error_message'])
class TestDelete(api_base.FunctionalTest):
def setUp(self):
super(TestDelete, self).setUp()
obj_utils.create_test_action_plan(self.context)
self.action = obj_utils.create_test_action(self.context, next=None)
p = mock.patch.object(db_api.Connection, 'update_action')
self.mock_action_update = p.start()
self.mock_action_update.side_effect = self._simulate_rpc_action_update
self.addCleanup(p.stop)
def _simulate_rpc_action_update(self, action):
action.save()
return action
@mock.patch('oslo_utils.timeutils.utcnow')
def test_delete_action(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
self.delete('/actions/%s' % self.action.uuid)
response = self.get_json('/actions/%s' % self.action.uuid,
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
self.context.show_deleted = True
action = objects.Action.get_by_uuid(self.context, self.action.uuid)
return_deleted_at = timeutils.strtime(action['deleted_at'])
self.assertEqual(timeutils.strtime(test_time), return_deleted_at)
self.assertEqual(action['state'], 'DELETED')
def test_delete_action_not_found(self):
uuid = utils.generate_uuid()
response = self.delete('/actions/%s' % uuid, expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])

View File

@@ -0,0 +1,433 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from wsme import types as wtypes
from watcher.api.controllers.v1 import action_plan as api_action_plan
from watcher.applier.framework import rpcapi as aapi
from watcher.common import utils
from watcher.db import api as db_api
from watcher import objects
from watcher.tests.api import base as api_base
from watcher.tests.api import utils as api_utils
from watcher.tests import base
from watcher.tests.objects import utils as obj_utils
class TestActionPlanObject(base.TestCase):
def test_actionPlan_init(self):
act_plan_dict = api_utils.action_plan_post_data()
del act_plan_dict['state']
del act_plan_dict['audit_id']
act_plan = api_action_plan.ActionPlan(**act_plan_dict)
self.assertEqual(wtypes.Unset, act_plan.state)
class TestListActionPlan(api_base.FunctionalTest):
def test_empty(self):
response = self.get_json('/action_plans')
self.assertEqual([], response['action_plans'])
def _assert_action_plans_fields(self, action_plan):
action_plan_fields = ['state']
for field in action_plan_fields:
self.assertIn(field, action_plan)
def test_one(self):
action_plan = obj_utils.create_action_plan_without_audit(self.context)
response = self.get_json('/action_plans')
self.assertEqual(action_plan.uuid,
response['action_plans'][0]["uuid"])
self._assert_action_plans_fields(response['action_plans'][0])
def test_one_soft_deleted(self):
action_plan = obj_utils.create_action_plan_without_audit(self.context)
action_plan.soft_delete()
response = self.get_json('/action_plans',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action_plan.uuid,
response['action_plans'][0]["uuid"])
self._assert_action_plans_fields(response['action_plans'][0])
response = self.get_json('/action_plans')
self.assertEqual([], response['action_plans'])
def test_get_one(self):
action_plan = obj_utils.create_action_plan_without_audit(self.context)
response = self.get_json('/action_plans/%s' % action_plan['uuid'])
self.assertEqual(action_plan.uuid, response['uuid'])
self._assert_action_plans_fields(response)
def test_get_one_soft_deleted(self):
action_plan = obj_utils.create_action_plan_without_audit(self.context)
action_plan.soft_delete()
response = self.get_json('/action_plans/%s' % action_plan['uuid'],
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action_plan.uuid, response['uuid'])
self._assert_action_plans_fields(response)
response = self.get_json('/action_plans/%s' % action_plan['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_detail(self):
action_plan = obj_utils.create_test_action_plan(self.context,
audit_id=None)
response = self.get_json('/action_plans/detail')
self.assertEqual(action_plan.uuid,
response['action_plans'][0]["uuid"])
self._assert_action_plans_fields(response['action_plans'][0])
def test_detail_soft_deleted(self):
action_plan = obj_utils.create_action_plan_without_audit(self.context)
action_plan.soft_delete()
response = self.get_json('/action_plans/detail',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action_plan.uuid,
response['action_plans'][0]["uuid"])
self._assert_action_plans_fields(response['action_plans'][0])
response = self.get_json('/action_plans/detail')
self.assertEqual([], response['action_plans'])
def test_detail_against_single(self):
action_plan = obj_utils.create_test_action_plan(self.context)
response = self.get_json(
'/action_plan/%s/detail' % action_plan['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_many(self):
action_plan_list = []
for id_ in range(5):
action_plan = obj_utils.create_action_plan_without_audit(
self.context, id=id_, uuid=utils.generate_uuid())
action_plan_list.append(action_plan.uuid)
response = self.get_json('/action_plans')
self.assertEqual(len(action_plan_list), len(response['action_plans']))
uuids = [s['uuid'] for s in response['action_plans']]
self.assertEqual(sorted(action_plan_list), sorted(uuids))
def test_many_with_soft_deleted_audit_uuid(self):
action_plan_list = []
audit1 = obj_utils.create_test_audit(self.context,
id=1,
uuid=utils.generate_uuid())
audit2 = obj_utils.create_test_audit(self.context,
id=2,
uuid=utils.generate_uuid())
for id_ in range(0, 2):
action_plan = obj_utils.create_test_action_plan(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_id=audit1.id)
action_plan_list.append(action_plan.uuid)
for id_ in range(2, 4):
action_plan = obj_utils.create_test_action_plan(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_id=audit2.id)
action_plan_list.append(action_plan.uuid)
self.delete('/audits/%s' % audit1.uuid)
response = self.get_json('/action_plans')
self.assertEqual(len(action_plan_list), len(response['action_plans']))
for id_ in range(0, 2):
action_plan = response['action_plans'][id_]
self.assertEqual(None, action_plan['audit_uuid'])
for id_ in range(2, 4):
action_plan = response['action_plans'][id_]
self.assertEqual(audit2.uuid, action_plan['audit_uuid'])
def test_many_with_audit_uuid(self):
action_plan_list = []
audit = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
for id_ in range(5):
action_plan = obj_utils.create_test_action_plan(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_id=audit.id)
action_plan_list.append(action_plan.uuid)
response = self.get_json('/action_plans')
self.assertEqual(len(action_plan_list), len(response['action_plans']))
for action in response['action_plans']:
self.assertEqual(audit.uuid, action['audit_uuid'])
def test_many_with_audit_uuid_filter(self):
action_plan_list1 = []
audit1 = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
for id_ in range(5):
action_plan = obj_utils.create_test_action_plan(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_id=audit1.id)
action_plan_list1.append(action_plan.uuid)
audit2 = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
action_plan_list2 = []
for id_ in [5, 6, 7]:
action_plan = obj_utils.create_test_action_plan(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_id=audit2.id)
action_plan_list2.append(action_plan.uuid)
response = self.get_json('/action_plans?audit_uuid=%s' % audit2.uuid)
self.assertEqual(len(action_plan_list2), len(response['action_plans']))
for action in response['action_plans']:
self.assertEqual(audit2.uuid, action['audit_uuid'])
def test_many_without_soft_deleted(self):
action_plan_list = []
for id_ in [1, 2, 3]:
action_plan = obj_utils.create_action_plan_without_audit(
self.context, id=id_, uuid=utils.generate_uuid())
action_plan_list.append(action_plan.uuid)
for id_ in [4, 5]:
action_plan = obj_utils.create_test_action_plan(
self.context, id=id_, uuid=utils.generate_uuid())
action_plan.soft_delete()
response = self.get_json('/action_plans')
self.assertEqual(3, len(response['action_plans']))
uuids = [s['uuid'] for s in response['action_plans']]
self.assertEqual(sorted(action_plan_list), sorted(uuids))
def test_many_with_soft_deleted(self):
action_plan_list = []
for id_ in [1, 2, 3]:
action_plan = obj_utils.create_action_plan_without_audit(
self.context, id=id_, uuid=utils.generate_uuid())
action_plan_list.append(action_plan.uuid)
for id_ in [4, 5]:
action_plan = obj_utils.create_action_plan_without_audit(
self.context, id=id_, uuid=utils.generate_uuid())
action_plan.soft_delete()
action_plan_list.append(action_plan.uuid)
response = self.get_json('/action_plans',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(5, len(response['action_plans']))
uuids = [s['uuid'] for s in response['action_plans']]
self.assertEqual(sorted(action_plan_list), sorted(uuids))
def test_many_with_sort_key_audit_uuid(self):
audit_list = []
for id_ in range(5):
audit = obj_utils.create_test_audit(self.context,
uuid=utils.generate_uuid())
obj_utils.create_test_action_plan(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_id=audit.id)
audit_list.append(audit.uuid)
response = self.get_json('/action_plans/?sort_key=audit_uuid')
self.assertEqual(5, len(response['action_plans']))
uuids = [s['audit_uuid'] for s in response['action_plans']]
self.assertEqual(sorted(audit_list), uuids)
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_action_plan_without_audit(self.context,
id=1, uuid=uuid)
response = self.get_json('/action_plans/%s' % uuid)
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
for l in response['links']:
bookmark = l['rel'] == 'bookmark'
self.assertTrue(self.validate_link(l['href'], bookmark=bookmark))
def test_collection_links(self):
for id_ in range(5):
obj_utils.create_action_plan_without_audit(
self.context, id=id_, uuid=utils.generate_uuid())
response = self.get_json('/action_plans/?limit=3')
self.assertEqual(3, len(response['action_plans']))
next_marker = response['action_plans'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_collection_links_default_limit(self):
cfg.CONF.set_override('max_limit', 3, 'api')
for id_ in range(5):
obj_utils.create_action_plan_without_audit(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_id=None)
response = self.get_json('/action_plans')
self.assertEqual(3, len(response['action_plans']))
next_marker = response['action_plans'][-1]['uuid']
self.assertIn(next_marker, response['next'])
class TestDelete(api_base.FunctionalTest):
def setUp(self):
super(TestDelete, self).setUp()
self.action_plan = obj_utils.create_action_plan_without_audit(
self.context)
p = mock.patch.object(db_api.Connection, 'destroy_action_plan')
self.mock_action_plan_delete = p.start()
self.mock_action_plan_delete.side_effect = \
self._simulate_rpc_action_plan_delete
self.addCleanup(p.stop)
def _simulate_rpc_action_plan_delete(self, audit_uuid):
action_plan = objects.ActionPlan.get_by_uuid(self.context, audit_uuid)
action_plan.destroy()
def test_delete_action_plan(self):
self.delete('/action_plans/%s' % self.action_plan.uuid)
response = self.get_json('/action_plans/%s' % self.action_plan.uuid,
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_delete_ction_plan_not_found(self):
uuid = utils.generate_uuid()
response = self.delete('/action_plans/%s' % uuid, expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
class TestPatch(api_base.FunctionalTest):
def setUp(self):
super(TestPatch, self).setUp()
self.action_plan = obj_utils.create_action_plan_without_audit(
self.context)
p = mock.patch.object(db_api.Connection, 'update_action_plan')
self.mock_action_plan_update = p.start()
self.mock_action_plan_update.side_effect = \
self._simulate_rpc_action_plan_update
self.addCleanup(p.stop)
def _simulate_rpc_action_plan_update(self, action_plan):
action_plan.save()
return action_plan
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_state = 'CANCELLED'
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
self.assertEqual(new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
def test_replace_non_existent_action_plan(self):
response = self.patch_json(
'/action_plans/%s' % utils.generate_uuid(),
[{'path': '/state', 'value': 'CANCELLED',
'op': 'replace'}],
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_add_ok(self):
new_state = 'CANCELLED'
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
[{'path': '/state', 'value': new_state, 'op': 'add'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_int)
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
self.assertEqual(new_state, response['state'])
def test_add_non_existent_property(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
self.assertTrue(response.json['error_message'])
def test_remove_ok(self):
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
self.assertIsNotNone(response['state'])
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
[{'path': '/state', 'op': 'remove'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
self.assertIsNone(response['state'])
def test_remove_uuid(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_remove_non_existent_property(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_replace_ok_state_starting(self):
with mock.patch.object(aapi.ApplierAPI,
'launch_action_plan') as applier_mock:
new_state = 'STARTING'
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
applier_mock.assert_called_once_with(mock.ANY,
self.action_plan.uuid)

View File

@@ -0,0 +1,475 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from six.moves.urllib import parse as urlparse
from wsme import types as wtypes
from watcher.api.controllers.v1 import audit_template as api_audit_template
from watcher.common import utils
from watcher.db import api as db_api
from watcher import objects
from watcher.tests.api import base as api_base
from watcher.tests.api import utils as api_utils
from watcher.tests import base
from watcher.tests.objects import utils as obj_utils
class TestAuditTemplateObject(base.TestCase):
def test_audit_template_init(self):
audit_template_dict = api_utils.audit_template_post_data()
del audit_template_dict['name']
audit_template = api_audit_template.AuditTemplate(
**audit_template_dict)
self.assertEqual(wtypes.Unset, audit_template.name)
class TestListAuditTemplate(api_base.FunctionalTest):
def test_empty(self):
response = self.get_json('/audit_templates')
self.assertEqual([], response['audit_templates'])
def _assert_audit_template_fields(self, audit_template):
audit_template_fields = ['name', 'goal', 'host_aggregate']
for field in audit_template_fields:
self.assertIn(field, audit_template)
def test_one(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json('/audit_templates')
self.assertEqual(audit_template.uuid,
response['audit_templates'][0]["uuid"])
self._assert_audit_template_fields(response['audit_templates'][0])
def test_one_soft_deleted(self):
audit_template = obj_utils.create_test_audit_template(self.context)
audit_template.soft_delete()
response = self.get_json('/audit_templates',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit_template.uuid,
response['audit_templates'][0]["uuid"])
self._assert_audit_template_fields(response['audit_templates'][0])
response = self.get_json('/audit_templates')
self.assertEqual([], response['audit_templates'])
def test_get_one_by_uuid(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json(
'/audit_templates/%s' % audit_template['uuid'])
self.assertEqual(audit_template.uuid, response['uuid'])
self._assert_audit_template_fields(response)
def test_get_one_by_name(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json(urlparse.quote(
'/audit_templates/%s' % audit_template['name']))
self.assertEqual(audit_template.uuid, response['uuid'])
self._assert_audit_template_fields(response)
def test_get_one_soft_deleted(self):
audit_template = obj_utils.create_test_audit_template(self.context)
audit_template.soft_delete()
response = self.get_json(
'/audit_templates/%s' % audit_template['uuid'],
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit_template.uuid, response['uuid'])
self._assert_audit_template_fields(response)
response = self.get_json(
'/audit_templates/%s' % audit_template['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_detail(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json('/audit_templates/detail')
self.assertEqual(audit_template.uuid,
response['audit_templates'][0]["uuid"])
self._assert_audit_template_fields(response['audit_templates'][0])
def test_detail_soft_deleted(self):
audit_template = obj_utils.create_test_audit_template(self.context)
audit_template.soft_delete()
response = self.get_json('/audit_templates/detail',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit_template.uuid,
response['audit_templates'][0]["uuid"])
self._assert_audit_template_fields(response['audit_templates'][0])
response = self.get_json('/audit_templates/detail')
self.assertEqual([], response['audit_templates'])
def test_detail_against_single(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json(
'/audit_templates/%s/detail' % audit_template['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_many(self):
audit_template_list = []
for id_ in range(5):
audit_template = obj_utils.create_test_audit_template(
self.context, id=id_,
uuid=utils.generate_uuid(),
name='My Audit Template ' + str(id_))
audit_template_list.append(audit_template.uuid)
response = self.get_json('/audit_templates')
self.assertEqual(len(audit_template_list),
len(response['audit_templates']))
uuids = [s['uuid'] for s in response['audit_templates']]
self.assertEqual(sorted(audit_template_list), sorted(uuids))
def test_many_without_soft_deleted(self):
audit_template_list = []
for id_ in [1, 2, 3]:
audit_template = obj_utils.create_test_audit_template(
self.context, id=id_, uuid=utils.generate_uuid(),
name='My Audit Template ' + str(id_))
audit_template_list.append(audit_template.uuid)
for id_ in [4, 5]:
audit_template = obj_utils.create_test_audit_template(
self.context, id=id_, uuid=utils.generate_uuid(),
name='My Audit Template ' + str(id_))
audit_template.soft_delete()
response = self.get_json('/audit_templates')
self.assertEqual(3, len(response['audit_templates']))
uuids = [s['uuid'] for s in response['audit_templates']]
self.assertEqual(sorted(audit_template_list), sorted(uuids))
def test_many_with_soft_deleted(self):
audit_template_list = []
for id_ in [1, 2, 3]:
audit_template = obj_utils.create_test_audit_template(
self.context, id=id_, uuid=utils.generate_uuid(),
name='My Audit Template ' + str(id_))
audit_template_list.append(audit_template.uuid)
for id_ in [4, 5]:
audit_template = obj_utils.create_test_audit_template(
self.context, id=id_, uuid=utils.generate_uuid(),
name='My Audit Template ' + str(id_))
audit_template.soft_delete()
audit_template_list.append(audit_template.uuid)
response = self.get_json('/audit_templates',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(5, len(response['audit_templates']))
uuids = [s['uuid'] for s in response['audit_templates']]
self.assertEqual(sorted(audit_template_list), sorted(uuids))
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_audit_template(self.context, id=1, uuid=uuid)
response = self.get_json('/audit_templates/%s' % uuid)
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
for l in response['links']:
bookmark = l['rel'] == 'bookmark'
self.assertTrue(self.validate_link(l['href'], bookmark=bookmark))
def test_collection_links(self):
for id_ in range(5):
obj_utils.create_test_audit_template(
self.context, id=id_, uuid=utils.generate_uuid(),
name='My Audit Template ' + str(id_))
response = self.get_json('/audit_templates/?limit=3')
self.assertEqual(3, len(response['audit_templates']))
next_marker = response['audit_templates'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_collection_links_default_limit(self):
cfg.CONF.set_override('max_limit', 3, 'api')
for id_ in range(5):
obj_utils.create_test_audit_template(
self.context, id=id_, uuid=utils.generate_uuid(),
name='My Audit Template ' + str(id_))
response = self.get_json('/audit_templates')
self.assertEqual(3, len(response['audit_templates']))
next_marker = response['audit_templates'][-1]['uuid']
self.assertIn(next_marker, response['next'])
class TestPatch(api_base.FunctionalTest):
def setUp(self):
super(TestPatch, self).setUp()
self.audit_template = obj_utils.create_test_audit_template(
self.context)
p = mock.patch.object(db_api.Connection, 'update_audit_template')
self.mock_audit_template_update = p.start()
self.mock_audit_template_update.side_effect = \
self._simulate_rpc_audit_template_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_template_update(self, audit_template):
audit_template.save()
return audit_template
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_goal = 'BALANCE_LOAD'
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
self.assertNotEqual(new_goal, response['goal'])
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
[{'path': '/goal', 'value': new_goal,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
self.assertEqual(new_goal, response['goal'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok_by_name(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_goal = 'BALANCE_LOAD'
response = self.get_json(urlparse.quote(
'/audit_templates/%s' % self.audit_template.name))
self.assertNotEqual(new_goal, response['goal'])
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.name,
[{'path': '/goal', 'value': new_goal,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.name)
self.assertEqual(new_goal, response['goal'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
def test_replace_non_existent_audit_template(self):
response = self.patch_json(
'/audit_templates/%s' % utils.generate_uuid(),
[{'path': '/goal', 'value': 'BALANCE_LOAD',
'op': 'replace'}],
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_add_ok(self):
new_goal = 'BALANCE_LOAD'
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
[{'path': '/goal', 'value': new_goal, 'op': 'add'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_int)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
self.assertEqual(new_goal, response['goal'])
def test_add_non_existent_property(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
self.assertTrue(response.json['error_message'])
def test_remove_ok(self):
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
self.assertIsNotNone(response['goal'])
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
[{'path': '/goal', 'op': 'remove'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
self.assertIsNone(response['goal'])
def test_remove_uuid(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_remove_non_existent_property(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
class TestPost(api_base.FunctionalTest):
def setUp(self):
super(TestPost, self).setUp()
p = mock.patch.object(db_api.Connection, 'create_audit_template')
self.mock_create_audit_template = p.start()
self.mock_create_audit_template.side_effect = (
self._simulate_rpc_audit_template_create)
self.addCleanup(p.stop)
def _simulate_rpc_audit_template_create(self, audit_template):
audit_template.create()
return audit_template
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit_template(self, mock_utcnow):
audit_template_dict = api_utils.audit_template_post_data()
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
response = self.post_json('/audit_templates', audit_template_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
# Check location header
self.assertIsNotNone(response.location)
expected_location = \
'/v1/audit_templates/%s' % audit_template_dict['uuid']
self.assertEqual(urlparse.urlparse(response.location).path,
expected_location)
self.assertEqual(audit_template_dict['uuid'], response.json['uuid'])
self.assertNotIn('updated_at', response.json.keys)
self.assertNotIn('deleted_at', response.json.keys)
return_created_at = timeutils.parse_isotime(
response.json['created_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_created_at)
def test_create_audit_template_doesnt_contain_id(self):
with mock.patch.object(
self.dbapi,
'create_audit_template',
wraps=self.dbapi.create_audit_template
) as cn_mock:
audit_template_dict = api_utils.audit_template_post_data(
goal='SERVERS_CONSOLIDATION')
response = self.post_json('/audit_templates', audit_template_dict)
self.assertEqual(audit_template_dict['goal'],
response.json['goal'])
cn_mock.assert_called_once_with(mock.ANY)
# Check that 'id' is not in first arg of positional args
self.assertNotIn('id', cn_mock.call_args[0][0])
def test_create_audit_template_generate_uuid(self):
audit_template_dict = api_utils.audit_template_post_data()
del audit_template_dict['uuid']
response = self.post_json('/audit_templates', audit_template_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(audit_template_dict['goal'], response.json['goal'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
def test_create_audit_template_with_invalid_goal(self):
with mock.patch.object(
self.dbapi,
'create_audit_template',
wraps=self.dbapi.create_audit_template
) as cn_mock:
audit_template_dict = api_utils.audit_template_post_data(
goal='INVALID_GOAL')
response = self.post_json('/audit_templates',
audit_template_dict, expect_errors=True)
self.assertEqual(400, response.status_int)
assert not cn_mock.called
class TestDelete(api_base.FunctionalTest):
def setUp(self):
super(TestDelete, self).setUp()
self.audit_template = obj_utils.create_test_audit_template(
self.context)
p = mock.patch.object(db_api.Connection, 'update_audit_template')
self.mock_audit_template_update = p.start()
self.mock_audit_template_update.side_effect = \
self._simulate_rpc_audit_template_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_template_update(self, audit_template):
audit_template.save()
return audit_template
@mock.patch('oslo_utils.timeutils.utcnow')
def test_delete_audit_template_by_uuid(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
self.delete('/audit_templates/%s' % self.audit_template.uuid)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid,
expect_errors=True)
# self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
self.context.show_deleted = True
audit_template = objects.AuditTemplate.get_by_uuid(
self.context, self.audit_template.uuid)
return_deleted_at = timeutils.strtime(audit_template['deleted_at'])
self.assertEqual(timeutils.strtime(test_time), return_deleted_at)
@mock.patch('oslo_utils.timeutils.utcnow')
def test_delete_audit_template_by_name(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
self.delete(urlparse.quote('/audit_templates/%s' %
self.audit_template.name))
response = self.get_json(urlparse.quote(
'/audit_templates/%s' % self.audit_template.name),
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
self.context.show_deleted = True
audit_template = objects.AuditTemplate.get_by_name(
self.context, self.audit_template.name)
return_deleted_at = timeutils.strtime(audit_template['deleted_at'])
self.assertEqual(timeutils.strtime(test_time), return_deleted_at)
def test_delete_audit_template_not_found(self):
uuid = utils.generate_uuid()
response = self.delete(
'/audit_templates/%s' % uuid, expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])

View File

@@ -0,0 +1,555 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from wsme import types as wtypes
from six.moves.urllib import parse as urlparse
from watcher.api.controllers.v1 import audit as api_audit
from watcher.common import utils
from watcher.db import api as db_api
from watcher.decision_engine.framework import rpcapi as deapi
from watcher import objects
from watcher.tests.api import base as api_base
from watcher.tests.api import utils as api_utils
from watcher.tests import base
from watcher.tests.db import utils as db_utils
from watcher.tests.objects import utils as obj_utils
def post_get_test_audit(**kw):
audit = api_utils.audit_post_data(**kw)
audit_template = db_utils.get_test_audit_template()
audit['audit_template_id'] = None
audit['audit_template_uuid'] = kw.get('audit_template_uuid',
audit_template['uuid'])
return audit
class TestAuditObject(base.TestCase):
def test_audit_init(self):
audit_dict = api_utils.audit_post_data(audit_template_id=None)
del audit_dict['state']
audit = api_audit.Audit(**audit_dict)
self.assertEqual(wtypes.Unset, audit.state)
class TestListAudit(api_base.FunctionalTest):
def setUp(self):
super(TestListAudit, self).setUp()
obj_utils.create_test_audit_template(self.context)
def test_empty(self):
response = self.get_json('/audits')
self.assertEqual([], response['audits'])
def _assert_audit_fields(self, audit):
audit_fields = ['type', 'deadline', 'state']
for field in audit_fields:
self.assertIn(field, audit)
def test_one(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits')
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
def test_one_soft_deleted(self):
audit = obj_utils.create_test_audit(self.context)
audit.soft_delete()
response = self.get_json('/audits',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
response = self.get_json('/audits')
self.assertEqual([], response['audits'])
def test_get_one(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/%s' % audit['uuid'])
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
def test_get_one_soft_deleted(self):
audit = obj_utils.create_test_audit(self.context)
audit.soft_delete()
response = self.get_json('/audits/%s' % audit['uuid'],
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
response = self.get_json('/audits/%s' % audit['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_detail(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/detail')
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
def test_detail_soft_deleted(self):
audit = obj_utils.create_test_audit(self.context)
audit.soft_delete()
response = self.get_json('/audits/detail',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit.uuid, response['audits'][0]["uuid"])
self._assert_audit_fields(response['audits'][0])
response = self.get_json('/audits/detail')
self.assertEqual([], response['audits'])
def test_detail_against_single(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/%s/detail' % audit['uuid'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_many(self):
audit_list = []
for id_ in range(5):
audit = obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
audit_list.append(audit.uuid)
response = self.get_json('/audits')
self.assertEqual(len(audit_list), len(response['audits']))
uuids = [s['uuid'] for s in response['audits']]
self.assertEqual(sorted(audit_list), sorted(uuids))
def test_many_without_soft_deleted(self):
audit_list = []
for id_ in [1, 2, 3]:
audit = obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
audit_list.append(audit.uuid)
for id_ in [4, 5]:
audit = obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
audit.soft_delete()
response = self.get_json('/audits')
self.assertEqual(3, len(response['audits']))
uuids = [s['uuid'] for s in response['audits']]
self.assertEqual(sorted(audit_list), sorted(uuids))
def test_many_with_soft_deleted(self):
audit_list = []
for id_ in [1, 2, 3]:
audit = obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
audit_list.append(audit.uuid)
for id_ in [4, 5]:
audit = obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
audit.soft_delete()
audit_list.append(audit.uuid)
response = self.get_json('/audits',
headers={'X-Show-Deleted': 'True'})
self.assertEqual(5, len(response['audits']))
uuids = [s['uuid'] for s in response['audits']]
self.assertEqual(sorted(audit_list), sorted(uuids))
def test_many_with_sort_key_audit_template_uuid(self):
audit_template_list = []
for id_ in range(5):
audit_template = obj_utils.create_test_audit_template(
self.context,
name='at' + str(id_),
uuid=utils.generate_uuid())
obj_utils.create_test_audit(
self.context, id=id_, uuid=utils.generate_uuid(),
audit_template_id=audit_template.id)
audit_template_list.append(audit_template.uuid)
response = self.get_json('/audits/?sort_key=audit_template_uuid')
self.assertEqual(5, len(response['audits']))
uuids = [s['audit_template_uuid'] for s in response['audits']]
self.assertEqual(sorted(audit_template_list), uuids)
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_audit(self.context, id=1, uuid=uuid)
response = self.get_json('/audits/%s' % uuid)
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
for l in response['links']:
bookmark = l['rel'] == 'bookmark'
self.assertTrue(self.validate_link(l['href'], bookmark=bookmark))
def test_collection_links(self):
for id_ in range(5):
obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
response = self.get_json('/audits/?limit=3')
self.assertEqual(3, len(response['audits']))
next_marker = response['audits'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_collection_links_default_limit(self):
cfg.CONF.set_override('max_limit', 3, 'api')
for id_ in range(5):
obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
response = self.get_json('/audits')
self.assertEqual(3, len(response['audits']))
next_marker = response['audits'][-1]['uuid']
self.assertIn(next_marker, response['next'])
def test_filter_by_audit_template_uuid(self):
audit_template_uuid = utils.generate_uuid()
audit_template_name = 'My_Audit_Template'
audit_template = obj_utils.create_test_audit_template(
self.context,
uuid=audit_template_uuid,
name=audit_template_name)
number_of_audits_with_audit_template_id = 5
for id_ in range(number_of_audits_with_audit_template_id):
obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid(),
audit_template_id=audit_template.id)
for id_ in range(6, 8):
obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
response = self.get_json('/audits/?audit_template=%s'
% audit_template_uuid)
audits = response['audits']
self.assertEqual(5, len(audits))
for audit in audits:
self.assertEqual(audit_template_uuid,
audit['audit_template_uuid'])
def test_filter_by_audit_template_name(self):
audit_template_uuid = utils.generate_uuid()
audit_template_name = 'My_Audit_Template'
audit_template = obj_utils.create_test_audit_template(
self.context,
uuid=audit_template_uuid,
name=audit_template_name)
number_of_audits_with_audit_template_id = 5
for id_ in range(number_of_audits_with_audit_template_id):
obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid(),
audit_template_id=audit_template.id)
for id_ in range(6, 8):
obj_utils.create_test_audit(self.context, id=id_,
uuid=utils.generate_uuid())
response = self.get_json('/audits/?audit_template=%s'
% audit_template_name)
audits = response['audits']
self.assertEqual(5, len(audits))
for audit in audits:
self.assertEqual(audit_template_uuid,
audit['audit_template_uuid'])
def test_many_by_soft_deleted_audit_template(self):
audit_list = []
audit_template1 = obj_utils.create_test_audit_template(
self.context,
uuid=utils.generate_uuid(),
name='at1',
id=3,
)
audit_template2 = obj_utils.create_test_audit_template(
self.context,
uuid=utils.generate_uuid(),
name='at2',
id=4,
)
for id_ in range(0, 2):
audit = obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(),
audit_template_id=audit_template1.id)
audit_list.append(audit.uuid)
for id_ in range(2, 4):
audit = obj_utils.create_test_audit(
self.context, id=id_,
uuid=utils.generate_uuid(),
audit_template_id=audit_template2.id)
audit_list.append(audit.uuid)
self.delete('/audit_templates/%s' % audit_template1.uuid)
response = self.get_json('/audits')
self.assertEqual(len(audit_list), len(response['audits']))
for id_ in range(0, 2):
audit = response['audits'][id_]
self.assertEqual(None, audit['audit_template_uuid'])
for id_ in range(2, 4):
audit = response['audits'][id_]
self.assertEqual(audit_template2.uuid,
audit['audit_template_uuid'])
class TestPatch(api_base.FunctionalTest):
def setUp(self):
super(TestPatch, self).setUp()
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context)
p = mock.patch.object(db_api.Connection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
@mock.patch('oslo_utils.timeutils.utcnow')
def test_replace_ok(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
new_state = 'SUBMITTED'
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
def test_replace_non_existent_audit(self):
response = self.patch_json('/audits/%s' % utils.generate_uuid(),
[{'path': '/state', 'value': 'SUBMITTED',
'op': 'replace'}],
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_add_ok(self):
new_state = 'SUCCESS'
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/state', 'value': new_state, 'op': 'add'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_int)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertEqual(new_state, response['state'])
def test_add_non_existent_property(self):
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(400, response.status_int)
self.assertTrue(response.json['error_message'])
def test_remove_ok(self):
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertIsNotNone(response['state'])
response = self.patch_json('/audits/%s' % self.audit.uuid,
[{'path': '/state', 'op': 'remove'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(200, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
self.assertIsNone(response['state'])
def test_remove_uuid(self):
response = self.patch_json('/audits/%s' % self.audit.uuid,
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
def test_remove_non_existent_property(self):
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
class TestPost(api_base.FunctionalTest):
def setUp(self):
super(TestPost, self).setUp()
obj_utils.create_test_audit_template(self.context)
p = mock.patch.object(db_api.Connection, 'create_audit')
self.mock_create_audit = p.start()
self.mock_create_audit.side_effect = (
self._simulate_rpc_audit_create)
self.addCleanup(p.stop)
def _simulate_rpc_audit_create(self, audit):
audit.create()
return audit
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_create_audit(self, mock_utcnow, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
audit_dict = post_get_test_audit()
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
# Check location header
self.assertIsNotNone(response.location)
expected_location = '/v1/audits/%s' % audit_dict['uuid']
self.assertEqual(urlparse.urlparse(response.location).path,
expected_location)
self.assertEqual(audit_dict['uuid'], response.json['uuid'])
self.assertEqual(objects.audit.AuditStatus.PENDING,
response.json['state'])
self.assertNotIn('updated_at', response.json.keys)
self.assertNotIn('deleted_at', response.json.keys)
return_created_at = timeutils.parse_isotime(
response.json['created_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_created_at)
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_doesnt_contain_id(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit(state='ONGOING')
with mock.patch.object(self.dbapi, 'create_audit',
wraps=self.dbapi.create_audit) as cn_mock:
response = self.post_json('/audits', audit_dict)
self.assertEqual(audit_dict['state'], response.json['state'])
cn_mock.assert_called_once_with(mock.ANY)
# Check that 'id' is not in first arg of positional args
self.assertNotIn('id', cn_mock.call_args[0][0])
@mock.patch.object(deapi.DecisionEngineAPI, 'trigger_audit')
def test_create_audit_generate_uuid(self, mock_trigger_audit):
mock_trigger_audit.return_value = mock.ANY
audit_dict = post_get_test_audit()
del audit_dict['uuid']
response = self.post_json('/audits', audit_dict)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
self.assertEqual(objects.audit.AuditStatus.PENDING,
response.json['state'])
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
def test_create_audit_trigger_decision_engine(self):
with mock.patch.object(deapi.DecisionEngineAPI,
'trigger_audit') as de_mock:
audit_dict = post_get_test_audit(state='ONGOING')
self.post_json('/audits', audit_dict)
de_mock.assert_called_once_with(mock.ANY, audit_dict['uuid'])
# class TestDelete(api_base.FunctionalTest):
# def setUp(self):
# super(TestDelete, self).setUp()
# self.audit = obj_utils.create_test_audit(self.context)
# p = mock.patch.object(db_api.Connection, 'destroy_audit')
# self.mock_audit_delete = p.start()
# self.mock_audit_delete.side_effect = self._simulate_rpc_audit_delete
# self.addCleanup(p.stop)
# def _simulate_rpc_audit_delete(self, audit_uuid):
# audit = objects.Audit.get_by_uuid(self.context, audit_uuid)
# audit.destroy()
# def test_delete_audit(self):
# self.delete('/audits/%s' % self.audit.uuid)
# response = self.get_json('/audits/%s' % self.audit.uuid,
# expect_errors=True)
# self.assertEqual(404, response.status_int)
# self.assertEqual('application/json', response.content_type)
# self.assertTrue(response.json['error_message'])
# def test_delete_audit_not_found(self):
# uuid = utils.generate_uuid()
# response = self.delete('/audits/%s' % uuid, expect_errors=True)
# self.assertEqual(404, response.status_int)
# self.assertEqual('application/json', response.content_type)
# self.assertTrue(response.json['error_message'])
class TestDelete(api_base.FunctionalTest):
def setUp(self):
super(TestDelete, self).setUp()
obj_utils.create_test_audit_template(self.context)
self.audit = obj_utils.create_test_audit(self.context)
p = mock.patch.object(db_api.Connection, 'update_audit')
self.mock_audit_update = p.start()
self.mock_audit_update.side_effect = self._simulate_rpc_audit_update
self.addCleanup(p.stop)
def _simulate_rpc_audit_update(self, audit):
audit.save()
return audit
@mock.patch('oslo_utils.timeutils.utcnow')
def test_delete_audit(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
self.delete('/audits/%s' % self.audit.uuid)
response = self.get_json('/audits/%s' % self.audit.uuid,
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
self.context.show_deleted = True
audit = objects.Audit.get_by_uuid(self.context, self.audit.uuid)
return_deleted_at = timeutils.strtime(audit['deleted_at'])
self.assertEqual(timeutils.strtime(test_time), return_deleted_at)
self.assertEqual(audit['state'], 'DELETED')
def test_delete_audit_not_found(self):
uuid = utils.generate_uuid()
response = self.delete('/audits/%s' % uuid, expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])

View File

@@ -0,0 +1,20 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcher.tests.api import base as api_base
class TestV1Routing(api_base.FunctionalTest):
def setUp(self):
super(TestV1Routing, self).setUp()

View File

@@ -0,0 +1,252 @@
# coding: utf-8
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import webtest
import wsme
from wsme import types as wtypes
from watcher.api.controllers.v1 import types
from watcher.common import exception
from watcher.common import utils
from watcher.tests import base
class TestUuidType(base.TestCase):
def test_valid_uuid(self):
test_uuid = '1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e'
self.assertEqual(test_uuid, types.UuidType.validate(test_uuid))
def test_invalid_uuid(self):
self.assertRaises(exception.InvalidUUID,
types.UuidType.validate, 'invalid-uuid')
class TestNameType(base.TestCase):
def test_valid_name(self):
test_name = 'hal-9000'
self.assertEqual(test_name, types.NameType.validate(test_name))
def test_invalid_name(self):
self.assertRaises(exception.InvalidName,
types.NameType.validate, '-this is not valid-')
class TestUuidOrNameType(base.TestCase):
@mock.patch.object(utils, 'is_uuid_like')
@mock.patch.object(utils, 'is_hostname_safe')
def test_valid_uuid(self, host_mock, uuid_mock):
test_uuid = '1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e'
host_mock.return_value = False
uuid_mock.return_value = True
self.assertTrue(types.UuidOrNameType.validate(test_uuid))
uuid_mock.assert_called_once_with(test_uuid)
@mock.patch.object(utils, 'is_uuid_like')
@mock.patch.object(utils, 'is_hostname_safe')
def test_valid_name(self, host_mock, uuid_mock):
test_name = 'dc16-database5'
uuid_mock.return_value = False
host_mock.return_value = True
self.assertTrue(types.UuidOrNameType.validate(test_name))
host_mock.assert_called_once_with(test_name)
def test_invalid_uuid_or_name(self):
self.assertRaises(exception.InvalidUuidOrName,
types.UuidOrNameType.validate, 'inval#uuid%or*name')
class MyPatchType(types.JsonPatchType):
"""Helper class for TestJsonPatchType tests."""
@staticmethod
def mandatory_attrs():
return ['/mandatory']
@staticmethod
def internal_attrs():
return ['/internal']
class MyRoot(wsme.WSRoot):
"""Helper class for TestJsonPatchType tests."""
@wsme.expose([wsme.types.text], body=[MyPatchType])
@wsme.validate([MyPatchType])
def test(self, patch):
return patch
class TestJsonPatchType(base.TestCase):
def setUp(self):
super(TestJsonPatchType, self).setUp()
self.app = webtest.TestApp(MyRoot(['restjson']).wsgiapp())
def _patch_json(self, params, expect_errors=False):
return self.app.patch_json(
'/test',
params=params,
headers={'Accept': 'application/json'},
expect_errors=expect_errors
)
def test_valid_patches(self):
valid_patches = [{'path': '/extra/foo', 'op': 'remove'},
{'path': '/extra/foo', 'op': 'add', 'value': 'bar'},
{'path': '/str', 'op': 'replace', 'value': 'bar'},
{'path': '/bool', 'op': 'add', 'value': True},
{'path': '/int', 'op': 'add', 'value': 1},
{'path': '/float', 'op': 'add', 'value': 0.123},
{'path': '/list', 'op': 'add', 'value': [1, 2]},
{'path': '/none', 'op': 'add', 'value': None},
{'path': '/empty_dict', 'op': 'add', 'value': {}},
{'path': '/empty_list', 'op': 'add', 'value': []},
{'path': '/dict', 'op': 'add',
'value': {'cat': 'meow'}}]
ret = self._patch_json(valid_patches, False)
self.assertEqual(200, ret.status_int)
self.assertEqual(sorted(valid_patches), sorted(ret.json))
def test_cannot_update_internal_attr(self):
patch = [{'path': '/internal', 'op': 'replace', 'value': 'foo'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_cannot_update_internal_dict_attr(self):
patch = [{'path': '/internal', 'op': 'replace',
'value': 'foo'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_mandatory_attr(self):
patch = [{'op': 'replace', 'path': '/mandatory', 'value': 'foo'}]
ret = self._patch_json(patch, False)
self.assertEqual(200, ret.status_int)
self.assertEqual(patch, ret.json)
def test_cannot_remove_mandatory_attr(self):
patch = [{'op': 'remove', 'path': '/mandatory'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_missing_required_fields_path(self):
missing_path = [{'op': 'remove'}]
ret = self._patch_json(missing_path, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_missing_required_fields_op(self):
missing_op = [{'path': '/foo'}]
ret = self._patch_json(missing_op, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_invalid_op(self):
patch = [{'path': '/foo', 'op': 'invalid'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_invalid_path(self):
patch = [{'path': 'invalid-path', 'op': 'remove'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_cannot_add_with_no_value(self):
patch = [{'path': '/extra/foo', 'op': 'add'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
def test_cannot_replace_with_no_value(self):
patch = [{'path': '/foo', 'op': 'replace'}]
ret = self._patch_json(patch, True)
self.assertEqual(400, ret.status_int)
self.assertTrue(ret.json['faultstring'])
class TestBooleanType(base.TestCase):
def test_valid_true_values(self):
v = types.BooleanType()
self.assertTrue(v.validate("true"))
self.assertTrue(v.validate("TRUE"))
self.assertTrue(v.validate("True"))
self.assertTrue(v.validate("t"))
self.assertTrue(v.validate("1"))
self.assertTrue(v.validate("y"))
self.assertTrue(v.validate("yes"))
self.assertTrue(v.validate("on"))
def test_valid_false_values(self):
v = types.BooleanType()
self.assertFalse(v.validate("false"))
self.assertFalse(v.validate("FALSE"))
self.assertFalse(v.validate("False"))
self.assertFalse(v.validate("f"))
self.assertFalse(v.validate("0"))
self.assertFalse(v.validate("n"))
self.assertFalse(v.validate("no"))
self.assertFalse(v.validate("off"))
def test_invalid_value(self):
v = types.BooleanType()
self.assertRaises(exception.Invalid, v.validate, "invalid-value")
self.assertRaises(exception.Invalid, v.validate, "01")
class TestJsonType(base.TestCase):
def test_valid_values(self):
vt = types.jsontype
value = vt.validate("hello")
self.assertEqual("hello", value)
value = vt.validate(10)
self.assertEqual(10, value)
value = vt.validate(0.123)
self.assertEqual(0.123, value)
value = vt.validate(True)
self.assertEqual(True, value)
value = vt.validate([1, 2, 3])
self.assertEqual([1, 2, 3], value)
value = vt.validate({'foo': 'bar'})
self.assertEqual({'foo': 'bar'}, value)
value = vt.validate(None)
self.assertEqual(None, value)
def test_invalid_values(self):
vt = types.jsontype
self.assertRaises(exception.Invalid, vt.validate, object())
def test_apimultitype_tostring(self):
vts = str(types.jsontype)
self.assertIn(str(wtypes.text), vts)
self.assertIn(str(int), vts)
self.assertIn(str(long), vts)
self.assertIn(str(float), vts)
self.assertIn(str(types.BooleanType), vts)
self.assertIn(str(list), vts)
self.assertIn(str(dict), vts)
self.assertIn(str(None), vts)

View File

@@ -0,0 +1,49 @@
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import wsme
from watcher.api.controllers.v1 import utils
from watcher.tests import base
from oslo_config import cfg
CONF = cfg.CONF
class TestApiUtils(base.TestCase):
def test_validate_limit(self):
limit = utils.validate_limit(10)
self.assertEqual(10, 10)
# max limit
limit = utils.validate_limit(999999999)
self.assertEqual(CONF.api.max_limit, limit)
# negative
self.assertRaises(wsme.exc.ClientSideError, utils.validate_limit, -1)
# zero
self.assertRaises(wsme.exc.ClientSideError, utils.validate_limit, 0)
def test_validate_sort_dir(self):
sort_dir = utils.validate_sort_dir('asc')
self.assertEqual('asc', sort_dir)
# invalid sort_dir parameter
self.assertRaises(wsme.exc.ClientSideError,
utils.validate_sort_dir,
'fake-sort')