Files
watcher/watcher_tempest_plugin/tests/scenario/base.py
David TARDIVEL a48a16596f Add action plan SUPERSEDED state
An action plan can be now go to SUPERSEDED state if another
action plan is on going for instance. It could be the case with
tempest test, because we can have several scenario tests running
at the same time.

Change-Id: I5449e545676463f0ea601e67993e9dffa67929a7
Closes-bug: #1657031
2017-01-17 09:41:19 +01:00

160 lines
5.3 KiB
Python

# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import unicode_literals
import time
from oslo_log import log
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import data_utils
from tempest.scenario import manager
from watcher_tempest_plugin import infra_optim_clients as clients
LOG = log.getLogger(__name__)
CONF = config.CONF
class BaseInfraOptimScenarioTest(manager.ScenarioTest):
"""Base class for Infrastructure Optimization API tests."""
@classmethod
def setup_credentials(cls):
cls._check_network_config()
super(BaseInfraOptimScenarioTest, cls).setup_credentials()
cls.mgr = clients.AdminManager()
@classmethod
def setup_clients(cls):
super(BaseInfraOptimScenarioTest, cls).setup_clients()
cls.client = cls.mgr.io_client
@classmethod
def resource_setup(cls):
super(BaseInfraOptimScenarioTest, cls).resource_setup()
@classmethod
def resource_cleanup(cls):
"""Ensure that all created objects get destroyed."""
super(BaseInfraOptimScenarioTest, cls).resource_cleanup()
@classmethod
def wait_for(cls, condition, timeout=30):
start_time = time.time()
while time.time() - start_time < timeout:
if condition():
break
time.sleep(.5)
@classmethod
def _check_network_config(cls):
if not CONF.network.public_network_id:
msg = 'public network not defined.'
LOG.error(msg)
raise exceptions.InvalidConfiguration(msg)
# ### AUDIT TEMPLATES ### #
def create_audit_template(self, goal, name=None, description=None,
strategy=None):
"""Wrapper utility for creating a test audit template
:param goal: Goal UUID or name related to the audit template.
:param name: The name of the audit template. Default: My Audit Template
:param description: The description of the audit template.
:param strategy: Strategy UUID or name related to the audit template.
:return: A tuple with The HTTP response and its body
"""
description = description or data_utils.rand_name(
'test-audit_template')
resp, body = self.client.create_audit_template(
name=name, description=description, goal=goal, strategy=strategy)
self.addCleanup(
self.delete_audit_template,
audit_template_uuid=body["uuid"]
)
return resp, body
def delete_audit_template(self, audit_template_uuid):
"""Deletes a audit_template having the specified UUID
:param audit_template_uuid: The unique identifier of the audit template
:return: Server response
"""
resp, _ = self.client.delete_audit_template(audit_template_uuid)
return resp
# ### AUDITS ### #
def create_audit(self, audit_template_uuid, audit_type='ONESHOT',
state=None):
"""Wrapper utility for creating a test audit
:param audit_template_uuid: Audit Template UUID this audit will use
:param type: Audit type (either ONESHOT or CONTINUOUS)
:return: A tuple with The HTTP response and its body
"""
resp, body = self.client.create_audit(
audit_template_uuid=audit_template_uuid, audit_type=audit_type,
state=state)
self.addCleanup(self.delete_audit, audit_uuid=body["uuid"])
return resp, body
def delete_audit(self, audit_uuid):
"""Deletes an audit having the specified UUID
:param audit_uuid: The unique identifier of the audit.
:return: the HTTP response
"""
_, action_plans = self.client.list_action_plans(audit_uuid=audit_uuid)
for action_plan in action_plans.get("action_plans", []):
self.delete_action_plan(action_plan_uuid=action_plan["uuid"])
resp, _ = self.client.delete_audit(audit_uuid)
return resp
def has_audit_succeeded(self, audit_uuid):
_, audit = self.client.show_audit(audit_uuid)
if audit.get('state') in ('FAILED', 'CANCELLED'):
raise ValueError()
return audit.get('state') == 'SUCCEEDED'
# ### ACTION PLANS ### #
def delete_action_plan(self, action_plan_uuid):
"""Deletes an action plan having the specified UUID
:param action_plan_uuid: The unique identifier of the action plan.
:return: the HTTP response
"""
resp, _ = self.client.delete_action_plan(action_plan_uuid)
return resp
def has_action_plan_finished(self, action_plan_uuid):
_, action_plan = self.client.show_action_plan(action_plan_uuid)
return action_plan.get('state') in ('FAILED', 'SUCCEEDED', 'CANCELLED',
'SUPERSEDED')