# -*- encoding: utf-8 -*- # Copyright (c) 2016 b<>com # # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import unicode_literals import time from oslo_log import log from tempest import config from tempest import exceptions from tempest.lib.common.utils import data_utils from watcher_tempest_plugin import infra_optim_clients as clients from watcher_tempest_plugin.tests.scenario import manager LOG = log.getLogger(__name__) CONF = config.CONF class BaseInfraOptimScenarioTest(manager.ScenarioTest): """Base class for Infrastructure Optimization API tests.""" # States where the object is waiting for some event to perform a transition IDLE_STATES = ('RECOMMENDED', 'FAILED', 'SUCCEEDED', 'CANCELLED') # States where the object can only be DELETED (end of its life-cycle) FINISHED_STATES = ('FAILED', 'SUCCEEDED', 'CANCELLED', 'SUPERSEDED') @classmethod def setup_credentials(cls): cls._check_network_config() super(BaseInfraOptimScenarioTest, cls).setup_credentials() cls.mgr = clients.AdminManager() @classmethod def setup_clients(cls): super(BaseInfraOptimScenarioTest, cls).setup_clients() cls.client = cls.mgr.io_client @classmethod def resource_setup(cls): super(BaseInfraOptimScenarioTest, cls).resource_setup() @classmethod def resource_cleanup(cls): """Ensure that all created objects get destroyed.""" super(BaseInfraOptimScenarioTest, cls).resource_cleanup() @classmethod def wait_for(cls, condition, timeout=30): start_time = time.time() while time.time() - start_time < timeout: if condition(): break time.sleep(.5) @classmethod def _check_network_config(cls): if not CONF.network.public_network_id: msg = 'public network not defined.' LOG.error(msg) raise exceptions.InvalidConfiguration(msg) # ### AUDIT TEMPLATES ### # def create_audit_template(self, goal, name=None, description=None, strategy=None): """Wrapper utility for creating a test audit template :param goal: Goal UUID or name related to the audit template. :param name: The name of the audit template. Default: My Audit Template :param description: The description of the audit template. :param strategy: Strategy UUID or name related to the audit template. :return: A tuple with The HTTP response and its body """ description = description or data_utils.rand_name( 'test-audit_template') resp, body = self.client.create_audit_template( name=name, description=description, goal=goal, strategy=strategy) self.addCleanup( self.delete_audit_template, audit_template_uuid=body["uuid"] ) return resp, body def delete_audit_template(self, audit_template_uuid): """Deletes a audit_template having the specified UUID :param audit_template_uuid: The unique identifier of the audit template :return: Server response """ resp, _ = self.client.delete_audit_template(audit_template_uuid) return resp # ### AUDITS ### # def create_audit(self, audit_template_uuid, audit_type='ONESHOT', state=None, parameters=None): """Wrapper utility for creating a test audit :param audit_template_uuid: Audit Template UUID this audit will use :param audit_type: Audit type (either ONESHOT or CONTINUOUS) :param state: Audit state :param parameters: Input parameters of the audit :return: A tuple with The HTTP response and its body """ resp, body = self.client.create_audit( audit_template_uuid=audit_template_uuid, audit_type=audit_type, state=state, parameters=parameters) self.addCleanup(self.delete_audit, audit_uuid=body["uuid"]) return resp, body def delete_audit(self, audit_uuid): """Deletes an audit having the specified UUID :param audit_uuid: The unique identifier of the audit. :return: the HTTP response """ _, action_plans = self.client.list_action_plans(audit_uuid=audit_uuid) for action_plan in action_plans.get("action_plans", []): self.delete_action_plan(action_plan_uuid=action_plan["uuid"]) resp, _ = self.client.delete_audit(audit_uuid) return resp def has_audit_succeeded(self, audit_uuid): _, audit = self.client.show_audit(audit_uuid) if audit.get('state') in ('FAILED', 'CANCELLED'): raise ValueError() return audit.get('state') == 'SUCCEEDED' @classmethod def has_audit_finished(cls, audit_uuid): _, audit = cls.client.show_audit(audit_uuid) return audit.get('state') in cls.FINISHED_STATES # ### ACTION PLANS ### # def delete_action_plan(self, action_plan_uuid): """Deletes an action plan having the specified UUID :param action_plan_uuid: The unique identifier of the action plan. :return: the HTTP response """ resp, _ = self.client.delete_action_plan(action_plan_uuid) return resp def has_action_plan_finished(self, action_plan_uuid): _, action_plan = self.client.show_action_plan(action_plan_uuid) return action_plan.get('state') in ('FAILED', 'SUCCEEDED', 'CANCELLED', 'SUPERSEDED')