Added Strategy model

In this changeset, I add the Strategy model as well as the DB
functionalities we need to manipulate strategies.

This changeset implies a DB schema update.

Partially Implements: blueprint get-goal-from-strategy

Change-Id: I438a8788844fbc514edfe1e9e3136f46ba5a82f2
This commit is contained in:
Vincent Françoise
2016-04-29 17:44:47 +02:00
parent 3b5ef15db6
commit 192d8e262c
10 changed files with 785 additions and 4 deletions

View File

@@ -151,7 +151,7 @@ class InvalidIdentity(Invalid):
class InvalidGoal(Invalid):
msg_fmt = _("Goal %(goal)s is not defined in Watcher configuration file")
msg_fmt = _("Goal %(goal)s is invalid")
class InvalidUUID(Invalid):
@@ -174,6 +174,14 @@ class GoalAlreadyExists(Conflict):
msg_fmt = _("A goal with UUID %(uuid)s already exists")
class StrategyNotFound(ResourceNotFound):
msg_fmt = _("Strategy %(strategy)s could not be found")
class StrategyAlreadyExists(Conflict):
msg_fmt = _("A strategy with UUID %(uuid)s already exists")
class AuditTemplateNotFound(ResourceNotFound):
msg_fmt = _("AuditTemplate %(audit_template)s could not be found")

View File

@@ -130,6 +130,96 @@ class BaseConnection(object):
:raises: :py:class:`~.Invalid`
"""
@abc.abstractmethod
def get_strategy_list(self, context, filters=None, limit=None,
marker=None, sort_key=None, sort_dir=None):
"""Get specific columns for matching strategies.
Return a list of the specified columns for all strategies that
match the specified filters.
:param context: The security context
:param columns: List of column names to return.
Defaults to 'id' column when columns == None.
:param filters: Filters to apply. Defaults to None.
:param limit: Maximum number of strategies to return.
:param marker: The last item of the previous page; we return the next
result set.
:param sort_key: Attribute by which results should be sorted.
:param sort_dir: Direction in which results should be sorted.
(asc, desc)
:returns: A list of tuples of the specified columns.
"""
@abc.abstractmethod
def create_strategy(self, values):
"""Create a new strategy.
:param values: A dict containing items used to identify
and track the strategy. For example:
::
{
'id': 1,
'uuid': utils.generate_uuid(),
'name': 'my_strategy',
'display_name': 'My strategy',
'goal_uuid': utils.generate_uuid(),
}
:returns: A strategy
:raises: :py:class:`~.StrategyAlreadyExists`
"""
@abc.abstractmethod
def get_strategy_by_id(self, context, strategy_id):
"""Return a strategy given its ID.
:param context: The security context
:param strategy_id: The ID of a strategy
:returns: A strategy
:raises: :py:class:`~.StrategyNotFound`
"""
@abc.abstractmethod
def get_strategy_by_uuid(self, context, strategy_uuid):
"""Return a strategy given its UUID.
:param context: The security context
:param strategy_uuid: The UUID of a strategy
:returns: A strategy
:raises: :py:class:`~.StrategyNotFound`
"""
@abc.abstractmethod
def get_strategy_by_name(self, context, strategy_name):
"""Return a strategy given its name.
:param context: The security context
:param strategy_name: The name of a strategy
:returns: A strategy
:raises: :py:class:`~.StrategyNotFound`
"""
@abc.abstractmethod
def destroy_strategy(self, strategy_uuid):
"""Destroy a strategy.
:param strategy_uuid: The UUID of a strategy
:raises: :py:class:`~.StrategyNotFound`
"""
@abc.abstractmethod
def update_strategy(self, strategy_uuid, values):
"""Update properties of a strategy.
:param strategy_uuid: The UUID of a strategy
:returns: A strategy
:raises: :py:class:`~.StrategyNotFound`
:raises: :py:class:`~.Invalid`
"""
@abc.abstractmethod
def get_audit_template_list(self, context, columns=None, filters=None,
limit=None, marker=None, sort_key=None,

View File

@@ -270,6 +270,14 @@ class Connection(api.BaseConnection):
query=query, model=models.Goal, filters=filters,
plain_fields=plain_fields)
def _add_strategies_filters(self, query, filters):
plain_fields = ['uuid', 'name', 'display_name', 'goal_id']
join_fieldmap = {'goal_uuid': models.Goal}
return self._add_filters(
query=query, model=models.Strategy, filters=filters,
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
def _add_audit_templates_filters(self, query, filters):
if filters is None:
filters = []
@@ -441,6 +449,72 @@ class Connection(api.BaseConnection):
except exception.ResourceNotFound:
raise exception.GoalNotFound(goal=goal_id)
# ### STRATEGIES ### #
def get_strategy_list(self, context, filters=None, limit=None,
marker=None, sort_key=None, sort_dir=None):
query = model_query(models.Strategy)
query = self._add_strategies_filters(query, filters)
if not context.show_deleted:
query = query.filter_by(deleted_at=None)
return _paginate_query(models.Strategy, limit, marker,
sort_key, sort_dir, query)
def create_strategy(self, values):
# ensure defaults are present for new strategies
if not values.get('uuid'):
values['uuid'] = utils.generate_uuid()
strategy = models.Strategy()
strategy.update(values)
try:
strategy.save()
except db_exc.DBDuplicateEntry:
raise exception.StrategyAlreadyExists(uuid=values['uuid'])
return strategy
def _get_strategy(self, context, fieldname, value):
try:
return self._get(context, model=models.Strategy,
fieldname=fieldname, value=value)
except exception.ResourceNotFound:
raise exception.StrategyNotFound(strategy=value)
def get_strategy_by_id(self, context, strategy_id):
return self._get_strategy(context, fieldname="id", value=strategy_id)
def get_strategy_by_uuid(self, context, strategy_uuid):
return self._get_strategy(
context, fieldname="uuid", value=strategy_uuid)
def get_strategy_by_name(self, context, strategy_name):
return self._get_strategy(
context, fieldname="name", value=strategy_name)
def destroy_strategy(self, strategy_id):
try:
return self._destroy(models.Strategy, strategy_id)
except exception.ResourceNotFound:
raise exception.StrategyNotFound(strategy=strategy_id)
def update_strategy(self, strategy_id, values):
if 'uuid' in values:
raise exception.Invalid(
message=_("Cannot overwrite UUID for an existing Strategy."))
try:
return self._update(models.Strategy, strategy_id, values)
except exception.ResourceNotFound:
raise exception.StrategyNotFound(strategy=strategy_id)
def soft_delete_strategy(self, strategy_id):
try:
self._soft_delete(models.Strategy, strategy_id)
except exception.ResourceNotFound:
raise exception.GoalNotFound(strategy=strategy_id)
# ### AUDIT TEMPLATES ### #
def get_audit_template_list(self, context, filters=None, limit=None,

View File

@@ -110,6 +110,21 @@ class WatcherBase(models.SoftDeleteMixin,
Base = declarative_base(cls=WatcherBase)
class Strategy(Base):
"""Represents a strategy."""
__tablename__ = 'strategies'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_strategies0uuid'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36))
name = Column(String(63), nullable=False)
display_name = Column(String(63), nullable=False)
goal_id = Column(Integer, ForeignKey('goals.id'), nullable=False)
class Goal(Base):
"""Represents a goal."""

View File

@@ -18,11 +18,14 @@ from watcher.objects import action_plan
from watcher.objects import audit
from watcher.objects import audit_template
from watcher.objects import goal
from watcher.objects import strategy
Audit = audit.Audit
AuditTemplate = audit_template.AuditTemplate
Action = action.Action
ActionPlan = action_plan.ActionPlan
Goal = goal.Goal
Strategy = strategy.Strategy
__all__ = ("Audit", "AuditTemplate", "Action", "ActionPlan", "Goal")
__all__ = ("Audit", "AuditTemplate", "Action",
"ActionPlan", "Goal", "Strategy")

View File

@@ -343,7 +343,7 @@ class WatcherObject(object):
@property
def obj_fields(self):
return self.fields.keys() + self.obj_extra_fields
return list(self.fields.keys()) + self.obj_extra_fields
# dictish syntactic sugar
def iteritems(self):

222
watcher/objects/strategy.py Normal file
View File

@@ -0,0 +1,222 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.common import exception
from watcher.common import utils
from watcher.db import api as dbapi
from watcher.objects import base
from watcher.objects import utils as obj_utils
class Strategy(base.WatcherObject):
dbapi = dbapi.get_instance()
fields = {
'id': int,
'uuid': obj_utils.str_or_none,
'name': obj_utils.str_or_none,
'display_name': obj_utils.str_or_none,
'goal_id': obj_utils.int_or_none,
}
@staticmethod
def _from_db_object(strategy, db_strategy):
"""Converts a database entity to a formal object."""
for field in strategy.fields:
strategy[field] = db_strategy[field]
strategy.obj_reset_changes()
return strategy
@staticmethod
def _from_db_object_list(db_objects, cls, context):
"""Converts a list of database entities to a list of formal objects."""
return [Strategy._from_db_object(cls(context), obj)
for obj in db_objects]
@classmethod
def get(cls, context, strategy_id):
"""Find a strategy based on its id or uuid
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
:param strategy_id: the id *or* uuid of a strategy.
:returns: a :class:`Strategy` object.
"""
if utils.is_int_like(strategy_id):
return cls.get_by_id(context, strategy_id)
elif utils.is_uuid_like(strategy_id):
return cls.get_by_uuid(context, strategy_id)
else:
raise exception.InvalidIdentity(identity=strategy_id)
@classmethod
def get_by_id(cls, context, strategy_id):
"""Find a strategy based on its integer id
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
:param strategy_id: the id of a strategy.
:returns: a :class:`Strategy` object.
"""
db_strategy = cls.dbapi.get_strategy_by_id(context, strategy_id)
strategy = Strategy._from_db_object(cls(context), db_strategy)
return strategy
@classmethod
def get_by_uuid(cls, context, uuid):
"""Find a strategy based on uuid
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
:param uuid: the uuid of a strategy.
:returns: a :class:`Strategy` object.
"""
db_strategy = cls.dbapi.get_strategy_by_uuid(context, uuid)
strategy = cls._from_db_object(cls(context), db_strategy)
return strategy
@classmethod
def get_by_name(cls, context, name):
"""Find a strategy based on name
:param name: the name of a strategy.
:param context: Security context
:returns: a :class:`Strategy` object.
"""
db_strategy = cls.dbapi.get_strategy_by_name(context, name)
strategy = cls._from_db_object(cls(context), db_strategy)
return strategy
@classmethod
def list(cls, context, limit=None, marker=None, filters=None,
sort_key=None, sort_dir=None):
"""Return a list of :class:`Strategy` objects.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
:param filters: dict mapping the filter key to a value.
:param limit: maximum number of resources to return in a single result.
:param marker: pagination marker for large data sets.
:param sort_key: column to sort results by.
:param sort_dir: direction to sort. "asc" or "desc".
:returns: a list of :class:`Strategy` object.
"""
db_strategies = cls.dbapi.get_strategy_list(
context,
filters=filters,
limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir)
return Strategy._from_db_object_list(db_strategies, cls, context)
def create(self, context=None):
"""Create a :class:`Strategy` record in the DB.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
"""
values = self.obj_get_changes()
db_strategy = self.dbapi.create_strategy(values)
self._from_db_object(self, db_strategy)
def destroy(self, context=None):
"""Delete the :class:`Strategy` from the DB.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
"""
self.dbapi.destroy_strategy(self.id)
self.obj_reset_changes()
def save(self, context=None):
"""Save updates to this :class:`Strategy`.
Updates will be made column by column based on the result
of self.what_changed().
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
"""
updates = self.obj_get_changes()
self.dbapi.update_strategy(self.id, updates)
self.obj_reset_changes()
def refresh(self, context=None):
"""Loads updates for this :class:`Strategy`.
Loads a strategy with the same uuid from the database and
checks for updated attributes. Updates are applied from
the loaded strategy column by column, if there are any updates.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
"""
current = self.__class__.get_by_id(self._context, strategy_id=self.id)
for field in self.fields:
if (hasattr(self, base.get_attrname(field)) and
self[field] != current[field]):
self[field] = current[field]
def soft_delete(self, context=None):
"""Soft Delete the :class:`Strategy` from the DB.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Strategy(context)
"""
self.dbapi.soft_delete_strategy(self.id)

View File

@@ -0,0 +1,344 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for manipulating Strategy via the DB API"""
import freezegun
import six
from watcher.common import exception
from watcher.common import utils as w_utils
from watcher.tests.db import base
from watcher.tests.db import utils
class TestDbStrategyFilters(base.DbTestCase):
FAKE_OLDER_DATE = '2014-01-01T09:52:05.219414'
FAKE_OLD_DATE = '2015-01-01T09:52:05.219414'
FAKE_TODAY = '2016-02-24T09:52:05.219414'
def setUp(self):
super(TestDbStrategyFilters, self).setUp()
self.context.show_deleted = True
self._data_setup()
def _data_setup(self):
strategy1_name = "STRATEGY_ID_1"
strategy2_name = "STRATEGY_ID_2"
strategy3_name = "STRATEGY_ID_3"
self.goal1 = utils.create_test_goal(
id=1, uuid=w_utils.generate_uuid(),
name="GOAL_ID", display_name="Goal")
self.goal2 = utils.create_test_goal(
id=2, uuid=w_utils.generate_uuid(),
name="DUMMY", display_name="Dummy")
with freezegun.freeze_time(self.FAKE_TODAY):
self.strategy1 = utils.create_test_strategy(
id=1, uuid=w_utils.generate_uuid(),
name=strategy1_name, display_name="Strategy 1",
goal_id=self.goal1.id)
with freezegun.freeze_time(self.FAKE_OLD_DATE):
self.strategy2 = utils.create_test_strategy(
id=2, uuid=w_utils.generate_uuid(),
name=strategy2_name, display_name="Strategy 2",
goal_id=self.goal1.id)
with freezegun.freeze_time(self.FAKE_OLDER_DATE):
self.strategy3 = utils.create_test_strategy(
id=3, uuid=w_utils.generate_uuid(),
name=strategy3_name, display_name="Strategy 3",
goal_id=self.goal2.id)
def _soft_delete_strategys(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.soft_delete_strategy(self.strategy1.id)
with freezegun.freeze_time(self.FAKE_OLD_DATE):
self.dbapi.soft_delete_strategy(self.strategy2.id)
with freezegun.freeze_time(self.FAKE_OLDER_DATE):
self.dbapi.soft_delete_strategy(self.strategy3.id)
def _update_strategies(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.update_strategy(
self.strategy1.id, values={"display_name": "strategy1"})
with freezegun.freeze_time(self.FAKE_OLD_DATE):
self.dbapi.update_strategy(
self.strategy2.id, values={"display_name": "strategy2"})
with freezegun.freeze_time(self.FAKE_OLDER_DATE):
self.dbapi.update_strategy(
self.strategy3.id, values={"display_name": "strategy3"})
def test_get_strategy_list_filter_deleted_true(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.soft_delete_strategy(self.strategy1.id)
res = self.dbapi.get_strategy_list(
self.context, filters={'deleted': True})
self.assertEqual([self.strategy1['uuid']], [r.uuid for r in res])
def test_get_strategy_list_filter_deleted_false(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.soft_delete_strategy(self.strategy1.id)
res = self.dbapi.get_strategy_list(
self.context, filters={'deleted': False})
self.assertEqual(
set([self.strategy2['uuid'], self.strategy3['uuid']]),
set([r.uuid for r in res]))
def test_get_strategy_list_filter_deleted_at_eq(self):
self._soft_delete_strategys()
res = self.dbapi.get_strategy_list(
self.context, filters={'deleted_at__eq': self.FAKE_TODAY})
self.assertEqual([self.strategy1['uuid']], [r.uuid for r in res])
def test_get_strategy_list_filter_deleted_at_lt(self):
self._soft_delete_strategys()
res = self.dbapi.get_strategy_list(
self.context, filters={'deleted_at__lt': self.FAKE_TODAY})
self.assertEqual(
set([self.strategy2['uuid'], self.strategy3['uuid']]),
set([r.uuid for r in res]))
def test_get_strategy_list_filter_deleted_at_lte(self):
self._soft_delete_strategys()
res = self.dbapi.get_strategy_list(
self.context, filters={'deleted_at__lte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.strategy2['uuid'], self.strategy3['uuid']]),
set([r.uuid for r in res]))
def test_get_strategy_list_filter_deleted_at_gt(self):
self._soft_delete_strategys()
res = self.dbapi.get_strategy_list(
self.context, filters={'deleted_at__gt': self.FAKE_OLD_DATE})
self.assertEqual([self.strategy1['uuid']], [r.uuid for r in res])
def test_get_strategy_list_filter_deleted_at_gte(self):
self._soft_delete_strategys()
res = self.dbapi.get_strategy_list(
self.context, filters={'deleted_at__gte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.strategy1['uuid'], self.strategy2['uuid']]),
set([r.uuid for r in res]))
# created_at #
def test_get_strategy_list_filter_created_at_eq(self):
res = self.dbapi.get_strategy_list(
self.context, filters={'created_at__eq': self.FAKE_TODAY})
self.assertEqual([self.strategy1['uuid']], [r.uuid for r in res])
def test_get_strategy_list_filter_created_at_lt(self):
res = self.dbapi.get_strategy_list(
self.context, filters={'created_at__lt': self.FAKE_TODAY})
self.assertEqual(
set([self.strategy2['uuid'], self.strategy3['uuid']]),
set([r.uuid for r in res]))
def test_get_strategy_list_filter_created_at_lte(self):
res = self.dbapi.get_strategy_list(
self.context, filters={'created_at__lte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.strategy2['uuid'], self.strategy3['uuid']]),
set([r.uuid for r in res]))
def test_get_strategy_list_filter_created_at_gt(self):
res = self.dbapi.get_strategy_list(
self.context, filters={'created_at__gt': self.FAKE_OLD_DATE})
self.assertEqual([self.strategy1['uuid']], [r.uuid for r in res])
def test_get_strategy_list_filter_created_at_gte(self):
res = self.dbapi.get_strategy_list(
self.context, filters={'created_at__gte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.strategy1['uuid'], self.strategy2['uuid']]),
set([r.uuid for r in res]))
# updated_at #
def test_get_strategy_list_filter_updated_at_eq(self):
self._update_strategies()
res = self.dbapi.get_strategy_list(
self.context, filters={'updated_at__eq': self.FAKE_TODAY})
self.assertEqual([self.strategy1['uuid']], [r.uuid for r in res])
def test_get_strategy_list_filter_updated_at_lt(self):
self._update_strategies()
res = self.dbapi.get_strategy_list(
self.context, filters={'updated_at__lt': self.FAKE_TODAY})
self.assertEqual(
set([self.strategy2['uuid'], self.strategy3['uuid']]),
set([r.uuid for r in res]))
def test_get_strategy_list_filter_updated_at_lte(self):
self._update_strategies()
res = self.dbapi.get_strategy_list(
self.context, filters={'updated_at__lte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.strategy2['uuid'], self.strategy3['uuid']]),
set([r.uuid for r in res]))
def test_get_strategy_list_filter_updated_at_gt(self):
self._update_strategies()
res = self.dbapi.get_strategy_list(
self.context, filters={'updated_at__gt': self.FAKE_OLD_DATE})
self.assertEqual([self.strategy1['uuid']], [r.uuid for r in res])
def test_get_strategy_list_filter_updated_at_gte(self):
self._update_strategies()
res = self.dbapi.get_strategy_list(
self.context, filters={'updated_at__gte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.strategy1['uuid'], self.strategy2['uuid']]),
set([r.uuid for r in res]))
class DbStrategyTestCase(base.DbTestCase):
def _create_test_strategy(self, **kwargs):
strategy = utils.get_test_strategy(**kwargs)
self.dbapi.create_strategy(strategy)
return strategy
def test_get_strategy_list(self):
ids = []
for i in range(1, 6):
strategy = utils.create_test_strategy(
id=i,
uuid=w_utils.generate_uuid(),
name="STRATEGY_ID_%s" % i,
display_name='My Strategy {0}'.format(i))
ids.append(six.text_type(strategy['uuid']))
res = self.dbapi.get_strategy_list(self.context)
res_ids = [r.display_name for r in res]
self.assertEqual(ids.sort(), res_ids.sort())
def test_get_strategy_list_with_filters(self):
strategy1 = self._create_test_strategy(
id=1,
uuid=w_utils.generate_uuid(),
name="STRATEGY_ID_1",
display_name='Strategy 1',
)
strategy2 = self._create_test_strategy(
id=2,
uuid=w_utils.generate_uuid(),
name="STRATEGY_ID_2",
display_name='Strategy 2',
)
res = self.dbapi.get_strategy_list(
self.context, filters={'display_name': 'Strategy 1'})
self.assertEqual([strategy1['uuid']], [r.uuid for r in res])
res = self.dbapi.get_strategy_list(
self.context, filters={'display_name': 'Strategy 3'})
self.assertEqual([], [r.uuid for r in res])
res = self.dbapi.get_strategy_list(
self.context,
filters={'goal_id': 1})
self.assertEqual([strategy1['uuid'], strategy2['uuid']],
[r.uuid for r in res])
res = self.dbapi.get_strategy_list(
self.context,
filters={'display_name': 'Strategy 2'})
self.assertEqual([strategy2['uuid']], [r.uuid for r in res])
def test_get_strategy_by_uuid(self):
created_strategy = self._create_test_strategy()
strategy = self.dbapi.get_strategy_by_uuid(
self.context, created_strategy['uuid'])
self.assertEqual(strategy.uuid, created_strategy['uuid'])
def test_get_strategy_by_name(self):
created_strategy = self._create_test_strategy()
strategy = self.dbapi.get_strategy_by_name(
self.context, created_strategy['name'])
self.assertEqual(strategy.name, created_strategy['name'])
def test_get_strategy_that_does_not_exist(self):
self.assertRaises(exception.StrategyNotFound,
self.dbapi.get_strategy_by_id,
self.context, 404)
def test_update_strategy(self):
strategy = self._create_test_strategy()
res = self.dbapi.update_strategy(
strategy['uuid'], {'display_name': 'updated-model'})
self.assertEqual('updated-model', res.display_name)
def test_update_goal_id(self):
strategy = self._create_test_strategy()
self.assertRaises(exception.Invalid,
self.dbapi.update_strategy, strategy['uuid'],
{'uuid': 'new_strategy_id'})
def test_update_strategy_that_does_not_exist(self):
self.assertRaises(exception.StrategyNotFound,
self.dbapi.update_strategy,
404,
{'display_name': ''})
def test_destroy_strategy(self):
strategy = self._create_test_strategy()
self.dbapi.destroy_strategy(strategy['uuid'])
self.assertRaises(exception.StrategyNotFound,
self.dbapi.get_strategy_by_id,
self.context, strategy['uuid'])
def test_destroy_strategy_that_does_not_exist(self):
self.assertRaises(exception.StrategyNotFound,
self.dbapi.destroy_strategy, 404)
def test_create_strategy_already_exists(self):
strategy_id = "STRATEGY_ID"
self._create_test_strategy(name=strategy_id)
self.assertRaises(exception.StrategyAlreadyExists,
self._create_test_strategy,
name=strategy_id)

View File

@@ -163,3 +163,28 @@ def create_test_goal(**kwargs):
goal = get_test_goal(**kwargs)
dbapi = db_api.get_instance()
return dbapi.create_goal(goal)
def get_test_strategy(**kwargs):
return {
'id': kwargs.get('id', 1),
'uuid': kwargs.get('uuid', 'cb3d0b58-4415-4d90-b75b-1e96878730e3'),
'name': kwargs.get('name', 'TEST'),
'display_name': kwargs.get('display_name', 'test strategy'),
'goal_id': kwargs.get('goal_id', 1),
'created_at': kwargs.get('created_at'),
'updated_at': kwargs.get('updated_at'),
'deleted_at': kwargs.get('deleted_at'),
}
def create_test_strategy(**kwargs):
"""Create test strategy entry in DB and return Strategy DB object.
Function to be used to create test Strategy objects in the database.
:param kwargs: kwargs with overriding values for strategy's attributes.
:returns: Test Strategy DB object.
"""
strategy = get_test_strategy(**kwargs)
dbapi = db_api.get_instance()
return dbapi.create_strategy(strategy)

View File

@@ -41,7 +41,7 @@ class TestDefaultAuditHandler(base.DbTestCase):
audit_handler.execute(self.audit.uuid, self.context)
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
def test_trigger_audit_state_success(self, mock_collector):
def test_trigger_audit_state_succeeded(self, mock_collector):
mock_collector.return_value = faker.FakerModelCollector()
audit_handler = default.DefaultAuditHandler(mock.MagicMock())
audit_handler.execute(self.audit.uuid, self.context)