Refactored existing tempest API tests

We must set up Tempest for Watcher
(http://docs.openstack.org/developer/tempest/configuration.html)
to run integration tests inside devstack environment.

This patchset is a refactoring of the stale Tempest tests to now
use the latest Tempest coding standards (like using plugins and
credentials factory).

This commit will have an effect on the doc as we need to integrate
Tempest in the Watcher documentation.

DocImpact
Partially Implements: blueprint tempest-basic-set-up
Change-Id: I7600ff8a28d524b56c7dd4903ac4d203634ae412
This commit is contained in:
Vincent Françoise
2016-01-08 15:44:27 +01:00
committed by Jean-Emile DARTOIS
parent 8832ad78e2
commit 595b13a622
27 changed files with 659 additions and 903 deletions

View File

@@ -0,0 +1,133 @@
..
Except where otherwise noted, this document is licensed under Creative
Commons Attribution 3.0 License. You can view the license at:
https://creativecommons.org/licenses/by/3.0/
.. _tempest_integration:
=============
Tempest tests
=============
This directory contains Tempest tests to cover Watcher project.
The following procedure gets you started with Tempest testing but you can also
refer to the `Tempest documentation`_ for more details.
.. _Tempest documentation: http://docs.openstack.org/developer/tempest/
Tempest installation
====================
To install Tempest you can issue the following commands::
$ git clone https://github.com/openstack/tempest/
$ cd tempest/
$ pip install .
The folder you are into now will be called ``<TEMPEST_DIR>`` from now onwards.
Please note that although it is fully working outside a virtual environment, it
is recommended to install within a venv.
Watcher Tempest testing setup
=============================
You can now install Watcher alongside it in development mode by issuing the
following command::
$ pip install -e <WATCHER_SRC_DIR>
Then setup a local working environment (here ``watcher-cloud``) for running
Tempest for Watcher which shall contain the configuration for your OpenStack
intergration platform.
In a virtual environment, you can do so by issuing the following command::
$ cd <TEMPEST_DIR>
$ tempest init watcher-cloud
Otherwise, if you are not using a virtualenv::
$ cd <TEMPEST_DIR>
$ tempest init --config-dir ./etc watcher-cloud
By default the configuration file is empty so before starting, you need to issue the following commands::
$ cd <TEMPEST_DIR>/watcher-cloud/etc
$ cp tempest.conf.sample tempest.conf
At this point you need to edit the ``watcher-cloud/etc/tempest.conf``
file as described in the `Tempest configuration guide`_.
Shown below is a minimal configuration you need to set within your
``tempest.conf`` configuration file which can get you started.
For Keystone V3::
uri_v3 = http://<KEYSTONE_PUBLIC_ENDPOINT_IP>:<KEYSTONE_PORT>/v3
admin_tenant_name = <ADMIN_TENANT_NAME>
admin_username = <ADMIN_USERNAME>
admin_password = <ADMIN_PASSWORD>
admin_domain_name = <ADMIN_DOMAIN_NAME>
api_v2 = false
api_v3 = true
auth_version = v3
For Keystone V2::
uri = http://<KEYSTONE_PUBLIC_ENDPOINT_IP>:<KEYSTONE_PORT>/v2.0
admin_tenant_name = <ADMIN_TENANT_NAME>
admin_username = <ADMIN_USERNAME>
admin_password = <ADMIN_PASSWORD>
auth_version = v2
For more information, please refer to:
- Keystone connection: http://docs.openstack.org/developer/tempest/configuration.html#keystone-connection-info
- Dynamic Keystone Credentials: http://docs.openstack.org/developer/tempest/configuration.html#dynamic-credentials
.. _virtual environment: http://docs.python-guide.org/en/latest/dev/virtualenvs/
.. _Tempest configuration guide: http://docs.openstack.org/developer/tempest/configuration.html
Watcher Tempest tests execution
===============================
To list all Watcher Tempest cases, you can issue the following commands::
$ cd <TEMPEST_DIR>
$ testr list-tests watcher
To run only these tests in Tempest, you can then issue these commands::
$ ./run_tempest.sh --config watcher-cloud/etc/tempest.conf -N -- watcher
Or alternatively the following commands if you are::
$ cd <TEMPEST_DIR>/watcher-cloud
$ ../run_tempest.sh -N -- watcher
To run a single test case, go to Tempest directory, then run with test case
name, e.g.::
$ cd <TEMPEST_DIR>
$ ./run_tempest.sh --config watcher-cloud/etc/tempest.conf -N \
-- watcher_tempest_plugin.tests.api.admin.test_audit_template.TestCreateDeleteAuditTemplate.test_create_audit_template
Alternatively, you can also run the Watcher Tempest plugin tests using tox. But
before you can do so, you need to follow the Tempest explanation on running
`tox with plugins`_. Then, run::
$ export TEMPEST_CONFIG_DIR=<TEMPEST_DIR>/watcher-cloud/etc/
$ tox -eall-plugin watcher
.. _tox with plugins: http://docs.openstack.org/developer/tempest/plugin.html#notes-for-using-plugins-with-virtualenvs
And, to run a specific test::
$ export TEMPEST_CONFIG_DIR=<TEMPEST_DIR>/watcher-cloud/etc/
$ tox -eall-plugin watcher_tempest_plugin.tests.api.admin.test_audit_template.TestCreateDeleteAuditTemplate.test_create_audit_template

View File

View File

@@ -0,0 +1,27 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
service_available_group = cfg.OptGroup(name="service_available",
title="Available OpenStack Services")
ServiceAvailableGroup = [
cfg.BoolOpt("watcher",
default=True,
help="Whether or not watcher is expected to be available"),
]

View File

@@ -0,0 +1,45 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from tempest import clients
from tempest.common import credentials_factory as creds_factory
from tempest import config
from watcher_tempest_plugin.services.infra_optim.v1.json import client as ioc
CONF = config.CONF
@six.add_metaclass(abc.ABCMeta)
class BaseManager(clients.Manager):
def __init__(self, credentials, service=None, api_microversions=None):
super(BaseManager, self).__init__(
credentials, service, api_microversions)
self.io_client = ioc.InfraOptimClientJSON(
self.auth_provider, 'infra-optim', CONF.identity.region)
class AdminManager(BaseManager):
def __init__(self, service=None, api_microversions=None):
super(AdminManager, self).__init__(
creds_factory.get_configured_credentials('identity_admin'),
service,
api_microversions
)

View File

@@ -0,0 +1,36 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from tempest import config
from tempest.test_discover import plugins
from watcher_tempest_plugin import config as watcher_config
class WatcherTempestPlugin(plugins.TempestPlugin):
def load_tests(self):
base_path = os.path.split(os.path.dirname(
os.path.abspath(__file__)))[0]
test_dir = "watcher_tempest_plugin/tests"
full_test_dir = os.path.join(base_path, test_dir)
return full_test_dir, base_path
def register_opts(self, conf):
config.register_opt_group(
conf, watcher_config.service_available_group,
watcher_config.ServiceAvailableGroup)
def get_opt_lists(self):
return []

View File

@@ -0,0 +1,211 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import functools
import six
import six.moves.urllib.parse as urlparse
from tempest.common import service_client
def handle_errors(f):
"""A decorator that allows to ignore certain types of errors."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
param_name = 'ignore_errors'
ignored_errors = kwargs.get(param_name, tuple())
if param_name in kwargs:
del kwargs[param_name]
try:
return f(*args, **kwargs)
except ignored_errors:
# Silently ignore errors
pass
return wrapper
@six.add_metaclass(abc.ABCMeta)
class BaseInfraOptimClient(service_client.ServiceClient):
"""Base Tempest REST client for Watcher API."""
URI_PREFIX = ''
@abc.abstractmethod
def serialize(self, object_dict):
"""Serialize an Watcher object."""
raise NotImplementedError()
@abc.abstractmethod
def deserialize(self, object_str):
"""Deserialize an Watcher object."""
raise NotImplementedError()
def _get_uri(self, resource_name, uuid=None, permanent=False):
"""Get URI for a specific resource or object.
:param resource_name: The name of the REST resource, e.g., 'audits'.
:param uuid: The unique identifier of an object in UUID format.
:return: Relative URI for the resource or object.
"""
prefix = self.URI_PREFIX if not permanent else ''
return '{pref}/{res}{uuid}'.format(pref=prefix,
res=resource_name,
uuid='/%s' % uuid if uuid else '')
def _make_patch(self, allowed_attributes, **kw):
"""Create a JSON patch according to RFC 6902.
:param allowed_attributes: An iterable object that contains a set of
allowed attributes for an object.
:param **kw: Attributes and new values for them.
:return: A JSON path that sets values of the specified attributes to
the new ones.
"""
def get_change(kw, path='/'):
for name, value in kw.items():
if isinstance(value, dict):
for ch in get_change(value, path + '%s/' % name):
yield ch
else:
if value is None:
yield {'path': path + name,
'op': 'remove'}
else:
yield {'path': path + name,
'value': value,
'op': 'replace'}
patch = [ch for ch in get_change(kw)
if ch['path'].lstrip('/') in allowed_attributes]
return patch
def _list_request(self, resource, permanent=False, **kwargs):
"""Get the list of objects of the specified type.
:param resource: The name of the REST resource, e.g., 'audits'.
"param **kw: Parameters for the request.
:return: A tuple with the server response and deserialized JSON list
of objects
"""
uri = self._get_uri(resource, permanent=permanent)
if kwargs:
uri += "?%s" % urlparse.urlencode(kwargs)
resp, body = self.get(uri)
self.expected_success(200, resp['status'])
return resp, self.deserialize(body)
def _show_request(self, resource, uuid, permanent=False, **kwargs):
"""Gets a specific object of the specified type.
:param uuid: Unique identifier of the object in UUID format.
:return: Serialized object as a dictionary.
"""
if 'uri' in kwargs:
uri = kwargs['uri']
else:
uri = self._get_uri(resource, uuid=uuid, permanent=permanent)
resp, body = self.get(uri)
self.expected_success(200, resp['status'])
return resp, self.deserialize(body)
def _create_request(self, resource, object_dict):
"""Create an object of the specified type.
:param resource: The name of the REST resource, e.g., 'audits'.
:param object_dict: A Python dict that represents an object of the
specified type.
:return: A tuple with the server response and the deserialized created
object.
"""
body = self.serialize(object_dict)
uri = self._get_uri(resource)
resp, body = self.post(uri, body=body)
self.expected_success(201, resp['status'])
return resp, self.deserialize(body)
def _delete_request(self, resource, uuid):
"""Delete specified object.
:param resource: The name of the REST resource, e.g., 'audits'.
:param uuid: The unique identifier of an object in UUID format.
:return: A tuple with the server response and the response body.
"""
uri = self._get_uri(resource, uuid)
resp, body = self.delete(uri)
self.expected_success(204, resp['status'])
return resp, body
def _patch_request(self, resource, uuid, patch_object):
"""Update specified object with JSON-patch.
:param resource: The name of the REST resource, e.g., 'audits'.
:param uuid: The unique identifier of an object in UUID format.
:return: A tuple with the server response and the serialized patched
object.
"""
uri = self._get_uri(resource, uuid)
patch_body = self.serialize(patch_object)
resp, body = self.patch(uri, body=patch_body)
self.expected_success(200, resp['status'])
return resp, self.deserialize(body)
@handle_errors
def get_api_description(self):
"""Retrieves all versions of the Watcher API."""
return self._list_request('', permanent=True)
@handle_errors
def get_version_description(self, version='v1'):
"""Retrieves the description of the API.
:param version: The version of the API. Default: 'v1'.
:return: Serialized description of API resources.
"""
return self._list_request(version, permanent=True)
def _put_request(self, resource, put_object):
"""Update specified object with JSON-patch."""
uri = self._get_uri(resource)
put_body = self.serialize(put_object)
resp, body = self.put(uri, body=put_body)
self.expected_success(202, resp['status'])
return resp, body

View File

@@ -0,0 +1,146 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import uuid
from watcher_tempest_plugin.services.infra_optim import base
class InfraOptimClientJSON(base.BaseInfraOptimClient):
"""Base Tempest REST client for Watcher API v1."""
URI_PREFIX = 'v1'
def serialize(self, object_dict):
"""Serialize an Watcher object."""
return json.dumps(object_dict)
def deserialize(self, object_str):
"""Deserialize an Watcher object."""
return json.loads(object_str.decode('utf-8'))
# ### AUDIT TEMPLATES ### #
@base.handle_errors
def list_audit_templates(self, **kwargs):
"""List all existing audit templates."""
return self._list_request('audit_templates', **kwargs)
@base.handle_errors
def list_audit_template_audits(self, audit_template_uuid):
"""Lists all audits associated with a audit template."""
return self._list_request(
'/audit_templates/%s/audits' % audit_template_uuid)
@base.handle_errors
def list_audit_templates_detail(self, **kwargs):
"""Lists details of all existing audit templates."""
return self._list_request('/audit_templates/detail', **kwargs)
@base.handle_errors
def show_audit_template(self, audit_template_uuid):
"""Gets a specific audit template.
:param audit_template_uuid: Unique identifier of the audit template
:return: Serialized audit template as a dictionary.
"""
return self._show_request('audit_templates', audit_template_uuid)
@base.handle_errors
def filter_audit_template_by_host_aggregate(self, host_aggregate):
"""Gets an audit template associated with given host agregate ID.
:param host_aggregate: Unique identifier of the host aggregate
:return: Serialized audit template as a dictionary.
"""
return self._list_request('/audit_templates',
host_aggregate=host_aggregate)
@base.handle_errors
def filter_audit_template_by_goal(self, goal):
"""Gets an audit template associated with given goal.
:param goal: goal identifier
:return: Serialized audit template as a dictionary.
"""
return self._list_request('/audit_templates', goal=goal)
@base.handle_errors
def create_audit_template(self, **kwargs):
"""Creates an audit template with the specified parameters.
:param name: The name of the audit template. Default: My Audit Template
:param description: The description of the audit template.
Default: AT Description
:param goal: The goal associated within the audit template.
Default: DUMMY
:param host_aggregate: ID of the host aggregate targeted by
this audit template. Default: 1
:param extra: IMetadata associated to this audit template.
Default: {}
:return: A tuple with the server response and the created audit
template.
"""
parameters = {k: v for k, v in kwargs.items() if v is not None}
# This name is unique to avoid the DB unique constraint on names
unique_name = 'Tempest Audit Template %s' % uuid.uuid4()
audit_template = {
'name': parameters.get('name', unique_name),
'description': parameters.get('description', ''),
'goal': parameters.get('goal', 'DUMMY'),
'host_aggregate': parameters.get('host_aggregate', 1),
'extra': parameters.get('extra', {}),
}
return self._create_request('audit_templates', audit_template)
@base.handle_errors
def create_audit(self, audit_template_uuid, **kwargs):
"""Create an audit with the specified parameters.
:param audit_template_uuid: Audit template ID used by the audit
:return: A tuple with the server response and the created audit.
"""
audit = {'audit_template_uuid': audit_template_uuid}
audit.update(kwargs)
return self._create_request('audits', audit)
@base.handle_errors
def delete_audit_template(self, audit_template_uuid):
"""Deletes an audit template having the specified UUID.
:param audit_template_uuid: The unique identifier of the audit template
:return: A tuple with the server response and the response body.
"""
return self._delete_request('audit_templates', audit_template_uuid)
@base.handle_errors
def update_audit_template(self, audit_template_uuid, patch):
"""Update the specified audit template.
:param audit_template_uuid: The unique identifier of the audit template
:param patch: List of dicts representing json patches.
:return: A tuple with the server response and the updated audit
template.
"""
return self._patch_request('audit_templates',
audit_template_uuid, patch)

View File

View File

@@ -0,0 +1,139 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from tempest import test
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
from watcher_tempest_plugin import infra_optim_clients as clients
# Resources must be deleted in a specific order, this list
# defines the resource types to clean up, and the correct order.
RESOURCE_TYPES = ['audit_template']
# RESOURCE_TYPES = ['action', 'action_plan', 'audit', 'audit_template']
def creates(resource):
"""Decorator that adds resources to the appropriate cleanup list."""
def decorator(f):
@functools.wraps(f)
def wrapper(cls, *args, **kwargs):
resp, body = f(cls, *args, **kwargs)
if 'uuid' in body:
cls.created_objects[resource].add(body['uuid'])
return resp, body
return wrapper
return decorator
class BaseInfraOptimTest(test.BaseTestCase):
"""Base class for Infrastructure Optimization API tests."""
@classmethod
def setup_credentials(cls):
super(BaseInfraOptimTest, cls).setup_credentials()
cls.mgr = clients.AdminManager()
@classmethod
def setup_clients(cls):
super(BaseInfraOptimTest, cls).setup_clients()
cls.client = cls.mgr.io_client
@classmethod
def resource_setup(cls):
super(BaseInfraOptimTest, cls).resource_setup()
cls.created_objects = {}
for resource in RESOURCE_TYPES:
cls.created_objects[resource] = set()
@classmethod
def resource_cleanup(cls):
"""Ensure that all created objects get destroyed."""
try:
for resource in RESOURCE_TYPES:
uuids = cls.created_objects[resource]
delete_method = getattr(cls.client, 'delete_%s' % resource)
for u in uuids:
delete_method(u, ignore_errors=lib_exc.NotFound)
finally:
super(BaseInfraOptimTest, cls).resource_cleanup()
def validate_self_link(self, resource, uuid, link):
"""Check whether the given self link formatted correctly."""
expected_link = "{base}/{pref}/{res}/{uuid}".format(
base=self.client.base_url,
pref=self.client.URI_PREFIX,
res=resource,
uuid=uuid
)
self.assertEqual(expected_link, link)
def assert_expected(self, expected, actual,
keys=('created_at', 'updated_at', 'deleted_at')):
# Check if not expected keys/values exists in actual response body
for key, value in expected.items():
if key not in keys:
self.assertIn(key, actual)
self.assertEqual(value, actual[key])
# ### AUDIT TEMPLATES ### #
@classmethod
@creates('audit_template')
def create_audit_template(cls, name=None, description=None, goal=None,
host_aggregate=None, extra=None):
"""Wrapper utility for creating a test audit template
:param name: The name of the audit template. Default: My Audit Template
:param description: The description of the audit template.
Default: AT Description
:param goal: The goal associated within the audit template.
Default: DUMMY
:param host_aggregate: ID of the host aggregate targeted by
this audit template. Default: 1
:param extra: IMetadata associated to this audit template.
Default: {}
:return: A tuple with The HTTP response and its body
"""
description = description or data_utils.rand_name(
'test-audit_template')
resp, body = cls.client.create_audit_template(
name=name, description=description, goal=goal,
host_aggregate=host_aggregate, extra=extra)
return resp, body
@classmethod
def delete_audit_template(cls, uuid):
"""Deletes a audit_template having the specified UUID
:param uuid: The unique identifier of the audit template
:return: Server response
"""
resp, body = cls.client.delete_audit_template(uuid)
if uuid in cls.created_objects['audit_template']:
cls.created_objects['audit_template'].remove(uuid)
return resp

View File

@@ -0,0 +1,47 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tempest import test
from watcher_tempest_plugin.tests.api.admin import base
class TestApiDiscovery(base.BaseInfraOptimTest):
"""Tests for API discovery features."""
@test.attr(type='smoke')
def test_api_versions(self):
_, descr = self.client.get_api_description()
expected_versions = ('v1',)
versions = [version['id'] for version in descr['versions']]
for v in expected_versions:
self.assertIn(v, versions)
@test.attr(type='smoke')
def test_default_version(self):
_, descr = self.client.get_api_description()
default_version = descr['default_version']
self.assertEqual(default_version['id'], 'v1')
@test.attr(type='smoke')
def test_version_1_resources(self):
_, descr = self.client.get_version_description(version='v1')
expected_resources = ('audit_templates', 'audits', 'action_plans',
'actions', 'links', 'media_types')
for res in expected_resources:
self.assertIn(res, descr)

View File

@@ -0,0 +1,245 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals
import uuid
from tempest import test
from tempest_lib import decorators
from tempest_lib import exceptions as lib_exc
from watcher_tempest_plugin.tests.api.admin import base
class TestCreateDeleteAuditTemplate(base.BaseInfraOptimTest):
"""Tests on audit templates"""
@test.attr(type='smoke')
def test_create_audit_template(self):
params = {'name': 'my at name %s' % uuid.uuid4(),
'description': 'my at description',
'host_aggregate': 12,
'goal': 'DUMMY',
'extra': {'str': 'value', 'int': 123, 'float': 0.123,
'bool': True, 'list': [1, 2, 3],
'dict': {'foo': 'bar'}}}
_, body = self.create_audit_template(**params)
self.assert_expected(params, body)
_, audit_template = self.client.show_audit_template(body['uuid'])
self.assert_expected(audit_template, body)
@test.attr(type='smoke')
def test_create_audit_template_unicode_description(self):
# Use a unicode string for testing:
params = {'name': 'my at name %s' % uuid.uuid4(),
'description': 'my àt déscrïptïôn',
'host_aggregate': 12,
'goal': 'DUMMY',
'extra': {'foo': 'bar'}}
_, body = self.create_audit_template(**params)
self.assert_expected(params, body)
_, audit_template = self.client.show_audit_template(body['uuid'])
self.assert_expected(audit_template, body)
@test.attr(type='smoke')
def test_delete_audit_template(self):
_, body = self.create_audit_template()
audit_uuid = body['uuid']
self.delete_audit_template(audit_uuid)
self.assertRaises(lib_exc.NotFound, self.client.show_audit_template,
audit_uuid)
class TestAuditTemplate(base.BaseInfraOptimTest):
"""Tests for audit_template."""
@classmethod
def resource_setup(cls):
super(TestAuditTemplate, cls).resource_setup()
_, cls.audit_template = cls.create_audit_template()
@test.attr(type='smoke')
def test_show_audit_template(self):
_, audit_template = self.client.show_audit_template(
self.audit_template['uuid'])
self.assert_expected(self.audit_template, audit_template)
@decorators.skip_because(bug="1510189")
@test.attr(type='smoke')
def test_filter_audit_template_by_goal(self):
_, audit_template = self.client.\
filter_audit_template_by_goal(self.audit_template['goal'])
self.assert_expected(self.audit_template,
audit_template['audit_templates'][0])
@decorators.skip_because(bug="1510189")
@test.attr(type='smoke')
def test_filter_audit_template_by_host_aggregate(self):
_, audit_template = self.client.\
filter_audit_template_by_host_aggregate(
self.audit_template['host_aggregate'])
self.assert_expected(self.audit_template,
audit_template['audit_templates'][0])
@test.attr(type='smoke')
def test_show_audit_template_with_links(self):
_, audit_template = self.client.show_audit_template(
self.audit_template['uuid'])
self.assertIn('links', audit_template.keys())
self.assertEqual(2, len(audit_template['links']))
self.assertIn(audit_template['uuid'],
audit_template['links'][0]['href'])
@test.attr(type="smoke")
def test_list_audit_templates(self):
_, body = self.client.list_audit_templates()
self.assertIn(self.audit_template['uuid'],
[i['uuid'] for i in body['audit_templates']])
# Verify self links.
for audit_template in body['audit_templates']:
self.validate_self_link('audit_templates', audit_template['uuid'],
audit_template['links'][0]['href'])
@test.attr(type='smoke')
def test_list_with_limit(self):
# We create 3 extra audit templates to exceed the limit we fix
for _ in range(3):
self.create_audit_template()
_, body = self.client.list_audit_templates(limit=3)
next_marker = body['audit_templates'][-1]['uuid']
self.assertEqual(len(body['audit_templates']), 3)
self.assertIn(next_marker, body['next'])
@test.attr(type='smoke')
def test_update_audit_template_replace(self):
params = {'name': 'my at name %s' % uuid.uuid4(),
'description': 'my at description',
'host_aggregate': 12,
'goal': 'DUMMY',
'extra': {'key1': 'value1', 'key2': 'value2'}}
_, body = self.create_audit_template(**params)
new_name = 'my at new name %s' % uuid.uuid4()
new_description = 'my new at description'
new_host_aggregate = 10
new_goal = 'A NEW GOAL'
new_extra = {'key1': 'new-value1', 'key2': 'new-value2'}
patch = [{'path': '/name',
'op': 'replace',
'value': new_name},
{'path': '/description',
'op': 'replace',
'value': new_description},
{'path': '/host_aggregate',
'op': 'replace',
'value': new_host_aggregate},
{'path': '/goal',
'op': 'replace',
'value': new_goal},
{'path': '/extra/key1',
'op': 'replace',
'value': new_extra['key1']},
{'path': '/extra/key2',
'op': 'replace',
'value': new_extra['key2']}]
self.client.update_audit_template(body['uuid'], patch)
_, body = self.client.show_audit_template(body['uuid'])
self.assertEqual(new_name, body['name'])
self.assertEqual(new_description, body['description'])
self.assertEqual(new_host_aggregate, body['host_aggregate'])
self.assertEqual(new_goal, body['goal'])
self.assertEqual(new_extra, body['extra'])
@test.attr(type='smoke')
def test_update_audit_template_remove(self):
extra = {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}
description = 'my at description'
goal = 'DUMMY'
name = 'my at name %s' % uuid.uuid4()
params = {'name': name,
'description': description,
'host_aggregate': 12,
'goal': goal,
'extra': extra}
_, audit_template = self.create_audit_template(**params)
# Removing one item from the collection
self.client.update_audit_template(
audit_template['uuid'],
[{'path': '/extra/key2', 'op': 'remove'}])
extra.pop('key2')
_, body = self.client.show_audit_template(audit_template['uuid'])
self.assertEqual(extra, body['extra'])
# Removing the collection
self.client.update_audit_template(
audit_template['uuid'],
[{'path': '/extra', 'op': 'remove'}])
_, body = self.client.show_audit_template(audit_template['uuid'])
self.assertEqual({}, body['extra'])
# Removing the Host Aggregate ID
self.client.update_audit_template(
audit_template['uuid'],
[{'path': '/host_aggregate', 'op': 'remove'}])
_, body = self.client.show_audit_template(audit_template['uuid'])
self.assertEqual({}, body['extra'])
# Assert nothing else was changed
self.assertEqual(name, body['name'])
self.assertEqual(description, body['description'])
self.assertEqual(goal, body['goal'])
@test.attr(type='smoke')
def test_update_audit_template_add(self):
params = {'name': 'my at name %s' % uuid.uuid4(),
'description': 'my at description',
'host_aggregate': 12,
'goal': 'DUMMY'}
_, body = self.create_audit_template(**params)
extra = {'key1': 'value1', 'key2': 'value2'}
patch = [{'path': '/extra/key1',
'op': 'add',
'value': extra['key1']},
{'path': '/extra/key2',
'op': 'add',
'value': extra['key2']}]
self.client.update_audit_template(body['uuid'], patch)
_, body = self.client.show_audit_template(body['uuid'])
self.assertEqual(extra, body['extra'])