Files
watcher/watcher_tempest_plugin/tests/api/admin/base.py
Alexander Chadin 48cc6b2718 Add Audit Scope Handler
This patch set adds audit scope mechanism.
It also removes host_aggregate field.

Change-Id: Ia98ed180a93fc8c19599735e2b41471d322bae9a
Partially-Implements: blueprint define-the-audit-scope
2016-10-13 10:50:58 +03:00

251 lines
9.0 KiB
Python

# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from tempest.lib.common.utils import data_utils
from tempest import test
from watcher_tempest_plugin import infra_optim_clients as clients
class BaseInfraOptimTest(test.BaseTestCase):
"""Base class for Infrastructure Optimization API tests."""
# States where the object is waiting for some event to perform a transition
IDLE_STATES = ('RECOMMENDED', 'FAILED', 'SUCCEEDED', 'CANCELLED')
# States where the object can only be DELETED (end of its life-cycle)
FINISHED_STATES = ('FAILED', 'SUCCEEDED', 'CANCELLED')
@classmethod
def setup_credentials(cls):
super(BaseInfraOptimTest, cls).setup_credentials()
cls.mgr = clients.AdminManager()
@classmethod
def setup_clients(cls):
super(BaseInfraOptimTest, cls).setup_clients()
cls.client = cls.mgr.io_client
@classmethod
def resource_setup(cls):
super(BaseInfraOptimTest, cls).resource_setup()
# Set of all created audit templates UUIDs
cls.created_audit_templates = set()
# Set of all created audit UUIDs
cls.created_audits = set()
# Set of all created audit UUIDs. We use it to build the list of
# action plans to delete (including potential orphan one(s))
cls.created_action_plans_audit_uuids = set()
@classmethod
def resource_cleanup(cls):
"""Ensure that all created objects get destroyed."""
try:
action_plans_to_be_deleted = set()
# Phase 1: Make sure all objects are in an idle state
for audit_uuid in cls.created_audits:
test.call_until_true(
func=functools.partial(
cls.is_audit_idle, audit_uuid),
duration=30,
sleep_for=.5
)
for audit_uuid in cls.created_action_plans_audit_uuids:
_, action_plans = cls.client.list_action_plans(
audit_uuid=audit_uuid)
action_plans_to_be_deleted.update(
ap['uuid'] for ap in action_plans['action_plans'])
for action_plan in action_plans['action_plans']:
test.call_until_true(
func=functools.partial(
cls.is_action_plan_idle, action_plan['uuid']),
duration=30,
sleep_for=.5
)
# Phase 2: Delete them all
for action_plan_uuid in action_plans_to_be_deleted:
cls.delete_action_plan(action_plan_uuid)
for audit_uuid in cls.created_audits.copy():
cls.delete_audit(audit_uuid)
for audit_template_uuid in cls.created_audit_templates.copy():
cls.delete_audit_template(audit_template_uuid)
finally:
super(BaseInfraOptimTest, cls).resource_cleanup()
def validate_self_link(self, resource, uuid, link):
"""Check whether the given self link formatted correctly."""
expected_link = "{base}/{pref}/{res}/{uuid}".format(
base=self.client.base_url,
pref=self.client.URI_PREFIX,
res=resource,
uuid=uuid
)
self.assertEqual(expected_link, link)
def assert_expected(self, expected, actual,
keys=('created_at', 'updated_at', 'deleted_at')):
# Check if not expected keys/values exists in actual response body
for key, value in expected.items():
if key not in keys:
self.assertIn(key, actual)
self.assertEqual(value, actual[key])
# ### AUDIT TEMPLATES ### #
@classmethod
def create_audit_template(cls, goal, name=None, description=None,
strategy=None, extra=None):
"""Wrapper utility for creating a test audit template
:param goal: Goal UUID or name related to the audit template.
:param name: The name of the audit template. Default: My Audit Template
:param description: The description of the audit template.
:param strategy: Strategy UUID or name related to the audit template.
:param extra: Metadata associated to this audit template.
:return: A tuple with The HTTP response and its body
"""
description = description or data_utils.rand_name(
'test-audit_template')
resp, body = cls.client.create_audit_template(
name=name, description=description, goal=goal,
strategy=strategy, extra=extra)
cls.created_audit_templates.add(body['uuid'])
return resp, body
@classmethod
def delete_audit_template(cls, uuid):
"""Deletes a audit_template having the specified UUID
:param uuid: The unique identifier of the audit template
:return: Server response
"""
resp, _ = cls.client.delete_audit_template(uuid)
if uuid in cls.created_audit_templates:
cls.created_audit_templates.remove(uuid)
return resp
# ### AUDITS ### #
@classmethod
def create_audit(cls, audit_template_uuid, audit_type='ONESHOT',
state=None, deadline=None, interval=None):
"""Wrapper utility for creating a test audit
:param audit_template_uuid: Audit Template UUID this audit will use
:param type: Audit type (either ONESHOT or CONTINUOUS)
:param state: Audit state (str)
:param deadline: Audit deadline (datetime)
:param interval: Audit interval in seconds (int)
:return: A tuple with The HTTP response and its body
"""
resp, body = cls.client.create_audit(
audit_template_uuid=audit_template_uuid, audit_type=audit_type,
state=state, deadline=deadline, interval=interval)
cls.created_audits.add(body['uuid'])
cls.created_action_plans_audit_uuids.add(body['uuid'])
return resp, body
@classmethod
def delete_audit(cls, audit_uuid):
"""Deletes an audit having the specified UUID
:param audit_uuid: The unique identifier of the audit.
:return: the HTTP response
"""
resp, _ = cls.client.delete_audit(audit_uuid)
if audit_uuid in cls.created_audits:
cls.created_audits.remove(audit_uuid)
return resp
@classmethod
def has_audit_succeeded(cls, audit_uuid):
_, audit = cls.client.show_audit(audit_uuid)
return audit.get('state') == 'SUCCEEDED'
@classmethod
def has_audit_finished(cls, audit_uuid):
_, audit = cls.client.show_audit(audit_uuid)
return audit.get('state') in cls.FINISHED_STATES
@classmethod
def is_audit_idle(cls, audit_uuid):
_, audit = cls.client.show_audit(audit_uuid)
return audit.get('state') in cls.IDLE_STATES
# ### ACTION PLANS ### #
@classmethod
def create_action_plan(cls, audit_template_uuid, **audit_kwargs):
"""Wrapper utility for creating a test action plan
:param audit_template_uuid: Audit template UUID to use
:param audit_kwargs: Dict of audit properties to set
:return: The action plan as dict
"""
_, audit = cls.create_audit(audit_template_uuid, **audit_kwargs)
audit_uuid = audit['uuid']
assert test.call_until_true(
func=functools.partial(cls.has_audit_succeeded, audit_uuid),
duration=30,
sleep_for=.5
)
_, action_plans = cls.client.list_action_plans(audit_uuid=audit_uuid)
return action_plans['action_plans'][0]
@classmethod
def delete_action_plan(cls, action_plan_uuid):
"""Deletes an action plan having the specified UUID
:param action_plan_uuid: The unique identifier of the action plan.
:return: the HTTP response
"""
resp, _ = cls.client.delete_action_plan(action_plan_uuid)
if action_plan_uuid in cls.created_action_plans_audit_uuids:
cls.created_action_plans_audit_uuids.remove(action_plan_uuid)
return resp
@classmethod
def has_action_plan_finished(cls, action_plan_uuid):
_, action_plan = cls.client.show_action_plan(action_plan_uuid)
return action_plan.get('state') in cls.FINISHED_STATES
@classmethod
def is_action_plan_idle(cls, action_plan_uuid):
"""This guard makes sure your action plan is not running"""
_, action_plan = cls.client.show_action_plan(action_plan_uuid)
return action_plan.get('state') in cls.IDLE_STATES