Compare commits
9 Commits
master
...
stable/202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f8d537330 | ||
|
|
6264f17c92 | ||
|
|
4fca7d246a | ||
|
|
64ba589f80 | ||
|
|
accc7a2a22 | ||
|
|
70d92a75cc | ||
|
|
f5ef5557ab | ||
|
|
c604e00b4d | ||
|
|
6b433b3547 |
@@ -2,3 +2,4 @@
|
|||||||
host=review.opendev.org
|
host=review.opendev.org
|
||||||
port=29418
|
port=29418
|
||||||
project=openstack/watcher.git
|
project=openstack/watcher.git
|
||||||
|
defaultbranch=stable/2024.1
|
||||||
|
|||||||
@@ -85,6 +85,7 @@
|
|||||||
vars:
|
vars:
|
||||||
tempest_concurrency: 1
|
tempest_concurrency: 1
|
||||||
tempest_test_regex: watcher_tempest_plugin.tests.scenario.test_execute_strategies
|
tempest_test_regex: watcher_tempest_plugin.tests.scenario.test_execute_strategies
|
||||||
|
tempest_exclude_regex: .*\[.*\breal_load\b.*\].*
|
||||||
|
|
||||||
- job:
|
- job:
|
||||||
name: watcher-tempest-multinode
|
name: watcher-tempest-multinode
|
||||||
|
|||||||
@@ -338,6 +338,19 @@ function stop_watcher {
|
|||||||
done
|
done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# configure_tempest_for_watcher() - Configure Tempest for watcher
|
||||||
|
function configure_tempest_for_watcher {
|
||||||
|
# Set default microversion for watcher-tempest-plugin
|
||||||
|
# Please make sure to update this when the microversion is updated, otherwise
|
||||||
|
# new tests may be skipped.
|
||||||
|
TEMPEST_WATCHER_MIN_MICROVERSION=${TEMPEST_WATCHER_MIN_MICROVERSION:-"1.0"}
|
||||||
|
TEMPEST_WATCHER_MAX_MICROVERSION=${TEMPEST_WATCHER_MAX_MICROVERSION:-"1.4"}
|
||||||
|
|
||||||
|
# Set microversion options in tempest.conf
|
||||||
|
iniset $TEMPEST_CONFIG optimize min_microversion $TEMPEST_WATCHER_MIN_MICROVERSION
|
||||||
|
iniset $TEMPEST_CONFIG optimize max_microversion $TEMPEST_WATCHER_MAX_MICROVERSION
|
||||||
|
}
|
||||||
|
|
||||||
# Restore xtrace
|
# Restore xtrace
|
||||||
$_XTRACE_WATCHER
|
$_XTRACE_WATCHER
|
||||||
|
|
||||||
|
|||||||
@@ -38,6 +38,9 @@ if is_service_enabled watcher-api watcher-decision-engine watcher-applier; then
|
|||||||
# Start the watcher components
|
# Start the watcher components
|
||||||
echo_summary "Starting watcher"
|
echo_summary "Starting watcher"
|
||||||
start_watcher
|
start_watcher
|
||||||
|
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
|
||||||
|
echo_summary "Configuring tempest for watcher"
|
||||||
|
configure_tempest_for_watcher
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [[ "$1" == "unstack" ]]; then
|
if [[ "$1" == "unstack" ]]; then
|
||||||
|
|||||||
47
releasenotes/notes/bug-2112187-763bae283e0b736d.yaml
Normal file
47
releasenotes/notes/bug-2112187-763bae283e0b736d.yaml
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
---
|
||||||
|
security:
|
||||||
|
- |
|
||||||
|
Watchers no longer forges requests on behalf of a tenant when
|
||||||
|
swapping volumes. Prior to this release watcher had 2 implementations
|
||||||
|
of moving a volume, it could use cinders volume migrate api or its own
|
||||||
|
internal implementation that directly calls nova volume attachment update
|
||||||
|
api. The former is safe and the recommend way to move volumes between
|
||||||
|
cinder storage backend the internal implementation was insecure, fragile
|
||||||
|
due to a lack of error handling and capable of deleting user data.
|
||||||
|
|
||||||
|
Insecure: the internal volume migration operation created a new keystone
|
||||||
|
user with a weak name and password and added it to the tenants project
|
||||||
|
with the admin role. It then used that user to forge request on behalf
|
||||||
|
of the tenant with admin right to swap the volume. if the applier was
|
||||||
|
restarted during the execution of this operation it would never be cleaned
|
||||||
|
up.
|
||||||
|
|
||||||
|
Fragile: the error handling was minimal, the swap volume api is async
|
||||||
|
so watcher has to poll for completion, there was no support to resume
|
||||||
|
that if interrupted of the time out was exceeded.
|
||||||
|
|
||||||
|
Data-loss: while the internal polling logic returned success or failure
|
||||||
|
watcher did not check the result, once the function returned it
|
||||||
|
unconditionally deleted the source volume. For larger volumes this
|
||||||
|
could result in irretrievable data loss.
|
||||||
|
|
||||||
|
Finally if a volume was swapped using the internal workflow it put
|
||||||
|
the nova instance in an out of sync state. If the VM was live migrated
|
||||||
|
after the swap volume completed successfully prior to a hard reboot
|
||||||
|
then the migration would fail or succeed and break tenant isolation.
|
||||||
|
|
||||||
|
see: https://bugs.launchpad.net/nova/+bug/2112187 for details.
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
All code related to creating keystone user and granting roles has been
|
||||||
|
removed. The internal swap volume implementation has been removed and
|
||||||
|
replaced by cinders volume migrate api. Note as part of this change
|
||||||
|
Watcher will no longer attempt volume migrations or retypes if the
|
||||||
|
instance is in the `Verify Resize` task state. This resolves several
|
||||||
|
issues related to volume migration in the zone migration and
|
||||||
|
Storage capacity balance strategies. While efforts have been made
|
||||||
|
to maintain backward compatibility these changes are required to
|
||||||
|
address a security weakness in watcher's prior approach.
|
||||||
|
|
||||||
|
see: https://bugs.launchpad.net/nova/+bug/2112187 for more context.
|
||||||
|
|
||||||
2
tox.ini
2
tox.ini
@@ -8,7 +8,7 @@ basepython = python3
|
|||||||
usedevelop = True
|
usedevelop = True
|
||||||
allowlist_externals = find
|
allowlist_externals = find
|
||||||
rm
|
rm
|
||||||
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
|
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2024.1} {opts} {packages}
|
||||||
setenv =
|
setenv =
|
||||||
VIRTUAL_ENV={envdir}
|
VIRTUAL_ENV={envdir}
|
||||||
deps =
|
deps =
|
||||||
|
|||||||
@@ -17,14 +17,11 @@ import jsonschema
|
|||||||
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
from cinderclient import client as cinder_client
|
|
||||||
from watcher._i18n import _
|
from watcher._i18n import _
|
||||||
from watcher.applier.actions import base
|
from watcher.applier.actions import base
|
||||||
from watcher.common import cinder_helper
|
from watcher.common import cinder_helper
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
from watcher.common import keystone_helper
|
|
||||||
from watcher.common import nova_helper
|
from watcher.common import nova_helper
|
||||||
from watcher.common import utils
|
|
||||||
from watcher import conf
|
from watcher import conf
|
||||||
|
|
||||||
CONF = conf.CONF
|
CONF = conf.CONF
|
||||||
@@ -70,8 +67,6 @@ class VolumeMigrate(base.BaseAction):
|
|||||||
|
|
||||||
def __init__(self, config, osc=None):
|
def __init__(self, config, osc=None):
|
||||||
super(VolumeMigrate, self).__init__(config)
|
super(VolumeMigrate, self).__init__(config)
|
||||||
self.temp_username = utils.random_string(10)
|
|
||||||
self.temp_password = utils.random_string(10)
|
|
||||||
self.cinder_util = cinder_helper.CinderHelper(osc=self.osc)
|
self.cinder_util = cinder_helper.CinderHelper(osc=self.osc)
|
||||||
self.nova_util = nova_helper.NovaHelper(osc=self.osc)
|
self.nova_util = nova_helper.NovaHelper(osc=self.osc)
|
||||||
|
|
||||||
@@ -134,83 +129,42 @@ class VolumeMigrate(base.BaseAction):
|
|||||||
|
|
||||||
def _can_swap(self, volume):
|
def _can_swap(self, volume):
|
||||||
"""Judge volume can be swapped"""
|
"""Judge volume can be swapped"""
|
||||||
|
# TODO(sean-k-mooney): rename this to _can_migrate and update
|
||||||
|
# tests to reflect that.
|
||||||
|
|
||||||
|
# cinder volume migration can migrate volumes that are not
|
||||||
|
# attached to instances or nova can migrate the data for cinder
|
||||||
|
# if the volume is in-use. If the volume has no attachments
|
||||||
|
# allow cinder to decided if it can be migrated.
|
||||||
if not volume.attachments:
|
if not volume.attachments:
|
||||||
return False
|
LOG.debug(f"volume: {volume.id} has no attachments")
|
||||||
instance_id = volume.attachments[0]['server_id']
|
|
||||||
instance_status = self.nova_util.find_instance(instance_id).status
|
|
||||||
|
|
||||||
if (volume.status == 'in-use' and
|
|
||||||
instance_status in ('ACTIVE', 'PAUSED', 'RESIZED')):
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
# since it has attachments we need to validate nova's constraints
|
||||||
|
instance_id = volume.attachments[0]['server_id']
|
||||||
def _create_user(self, volume, user):
|
instance_status = self.nova_util.find_instance(instance_id).status
|
||||||
"""Create user with volume attribute and user information"""
|
LOG.debug(
|
||||||
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
|
f"volume: {volume.id} is attached to instance: {instance_id} "
|
||||||
project_id = getattr(volume, 'os-vol-tenant-attr:tenant_id')
|
f"in instance status: {instance_status}")
|
||||||
user['project'] = project_id
|
# NOTE(sean-k-mooney): This used to allow RESIZED which
|
||||||
user['domain'] = keystone_util.get_project(project_id).domain_id
|
# is the resize_verify task state, that is not an acceptable time
|
||||||
user['roles'] = ['admin']
|
# to migrate volumes, if nova does not block this in the API
|
||||||
return keystone_util.create_user(user)
|
# today that is probably a bug. PAUSED is also questionable but
|
||||||
|
# it should generally be safe.
|
||||||
def _get_cinder_client(self, session):
|
return (volume.status == 'in-use' and
|
||||||
"""Get cinder client by session"""
|
instance_status in ('ACTIVE', 'PAUSED'))
|
||||||
return cinder_client.Client(
|
|
||||||
CONF.cinder_client.api_version,
|
|
||||||
session=session,
|
|
||||||
endpoint_type=CONF.cinder_client.endpoint_type)
|
|
||||||
|
|
||||||
def _swap_volume(self, volume, dest_type):
|
|
||||||
"""Swap volume to dest_type
|
|
||||||
|
|
||||||
Limitation note: only for compute libvirt driver
|
|
||||||
"""
|
|
||||||
if not dest_type:
|
|
||||||
raise exception.Invalid(
|
|
||||||
message=(_("destination type is required when "
|
|
||||||
"migration type is swap")))
|
|
||||||
|
|
||||||
if not self._can_swap(volume):
|
|
||||||
raise exception.Invalid(
|
|
||||||
message=(_("Invalid state for swapping volume")))
|
|
||||||
|
|
||||||
user_info = {
|
|
||||||
'name': self.temp_username,
|
|
||||||
'password': self.temp_password}
|
|
||||||
user = self._create_user(volume, user_info)
|
|
||||||
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
|
|
||||||
try:
|
|
||||||
session = keystone_util.create_session(
|
|
||||||
user.id, self.temp_password)
|
|
||||||
temp_cinder = self._get_cinder_client(session)
|
|
||||||
|
|
||||||
# swap volume
|
|
||||||
new_volume = self.cinder_util.create_volume(
|
|
||||||
temp_cinder, volume, dest_type)
|
|
||||||
self.nova_util.swap_volume(volume, new_volume)
|
|
||||||
|
|
||||||
# delete old volume
|
|
||||||
self.cinder_util.delete_volume(volume)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
keystone_util.delete_user(user)
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _migrate(self, volume_id, dest_node, dest_type):
|
def _migrate(self, volume_id, dest_node, dest_type):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
volume = self.cinder_util.get_volume(volume_id)
|
volume = self.cinder_util.get_volume(volume_id)
|
||||||
if self.migration_type == self.SWAP:
|
# for backward compatibility map swap to migrate.
|
||||||
if dest_node:
|
if self.migration_type in (self.SWAP, self.MIGRATE):
|
||||||
LOG.warning("dest_node is ignored")
|
if not self._can_swap(volume):
|
||||||
return self._swap_volume(volume, dest_type)
|
raise exception.Invalid(
|
||||||
|
message=(_("Invalid state for swapping volume")))
|
||||||
|
return self.cinder_util.migrate(volume, dest_node)
|
||||||
elif self.migration_type == self.RETYPE:
|
elif self.migration_type == self.RETYPE:
|
||||||
return self.cinder_util.retype(volume, dest_type)
|
return self.cinder_util.retype(volume, dest_type)
|
||||||
elif self.migration_type == self.MIGRATE:
|
|
||||||
return self.cinder_util.migrate(volume, dest_node)
|
|
||||||
else:
|
else:
|
||||||
raise exception.Invalid(
|
raise exception.Invalid(
|
||||||
message=(_("Migration of type '%(migration_type)s' is not "
|
message=(_("Migration of type '%(migration_type)s' is not "
|
||||||
|
|||||||
@@ -11,12 +11,15 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_context import context
|
from oslo_context import context
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@enginefacade.transaction_context_provider
|
||||||
class RequestContext(context.RequestContext):
|
class RequestContext(context.RequestContext):
|
||||||
"""Extends security contexts from the OpenStack common library."""
|
"""Extends security contexts from the OpenStack common library."""
|
||||||
|
|
||||||
|
|||||||
@@ -15,8 +15,6 @@
|
|||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
from keystoneauth1.exceptions import http as ks_exceptions
|
from keystoneauth1.exceptions import http as ks_exceptions
|
||||||
from keystoneauth1 import loading
|
|
||||||
from keystoneauth1 import session
|
|
||||||
from watcher._i18n import _
|
from watcher._i18n import _
|
||||||
from watcher.common import clients
|
from watcher.common import clients
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
@@ -90,35 +88,3 @@ class KeystoneHelper(object):
|
|||||||
message=(_("Domain name seems ambiguous: %s") %
|
message=(_("Domain name seems ambiguous: %s") %
|
||||||
name_or_id))
|
name_or_id))
|
||||||
return domains[0]
|
return domains[0]
|
||||||
|
|
||||||
def create_session(self, user_id, password):
|
|
||||||
user = self.get_user(user_id)
|
|
||||||
loader = loading.get_plugin_loader('password')
|
|
||||||
auth = loader.load_from_options(
|
|
||||||
auth_url=CONF.watcher_clients_auth.auth_url,
|
|
||||||
password=password,
|
|
||||||
user_id=user_id,
|
|
||||||
project_id=user.default_project_id)
|
|
||||||
return session.Session(auth=auth)
|
|
||||||
|
|
||||||
def create_user(self, user):
|
|
||||||
project = self.get_project(user['project'])
|
|
||||||
domain = self.get_domain(user['domain'])
|
|
||||||
_user = self.keystone.users.create(
|
|
||||||
user['name'],
|
|
||||||
password=user['password'],
|
|
||||||
domain=domain,
|
|
||||||
project=project,
|
|
||||||
)
|
|
||||||
for role in user['roles']:
|
|
||||||
role = self.get_role(role)
|
|
||||||
self.keystone.roles.grant(
|
|
||||||
role.id, user=_user.id, project=project.id)
|
|
||||||
return _user
|
|
||||||
|
|
||||||
def delete_user(self, user):
|
|
||||||
try:
|
|
||||||
user = self.get_user(user)
|
|
||||||
self.keystone.users.delete(user)
|
|
||||||
except exception.Invalid:
|
|
||||||
pass
|
|
||||||
|
|||||||
@@ -19,9 +19,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import datetime
|
import datetime
|
||||||
import inspect
|
import inspect
|
||||||
import random
|
|
||||||
import re
|
import re
|
||||||
import string
|
|
||||||
|
|
||||||
from croniter import croniter
|
from croniter import croniter
|
||||||
import eventlet
|
import eventlet
|
||||||
@@ -160,14 +158,10 @@ def extend_with_strict_schema(validator_class):
|
|||||||
StrictDefaultValidatingDraft4Validator = extend_with_default(
|
StrictDefaultValidatingDraft4Validator = extend_with_default(
|
||||||
extend_with_strict_schema(validators.Draft4Validator))
|
extend_with_strict_schema(validators.Draft4Validator))
|
||||||
|
|
||||||
|
|
||||||
Draft4Validator = validators.Draft4Validator
|
Draft4Validator = validators.Draft4Validator
|
||||||
|
|
||||||
|
|
||||||
def random_string(n):
|
|
||||||
return ''.join([random.choice(
|
|
||||||
string.ascii_letters + string.digits) for i in range(n)])
|
|
||||||
|
|
||||||
|
|
||||||
# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet.
|
# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet.
|
||||||
# As a workaround, we're delegating such calls to a native thread.
|
# As a workaround, we're delegating such calls to a native thread.
|
||||||
def async_compat_call(f, *args, **kwargs):
|
def async_compat_call(f, *args, **kwargs):
|
||||||
|
|||||||
@@ -13,8 +13,8 @@
|
|||||||
from logging import config as log_config
|
from logging import config as log_config
|
||||||
|
|
||||||
from alembic import context
|
from alembic import context
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
|
|
||||||
from watcher.db.sqlalchemy import api as sqla_api
|
|
||||||
from watcher.db.sqlalchemy import models
|
from watcher.db.sqlalchemy import models
|
||||||
|
|
||||||
# this is the Alembic Config object, which provides
|
# this is the Alembic Config object, which provides
|
||||||
@@ -43,7 +43,7 @@ def run_migrations_online():
|
|||||||
and associate a connection with the context.
|
and associate a connection with the context.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
engine = sqla_api.get_engine()
|
engine = enginefacade.writer.get_engine()
|
||||||
with engine.connect() as connection:
|
with engine.connect() as connection:
|
||||||
context.configure(connection=connection,
|
context.configure(connection=connection,
|
||||||
target_metadata=target_metadata)
|
target_metadata=target_metadata)
|
||||||
|
|||||||
@@ -19,10 +19,12 @@
|
|||||||
import collections
|
import collections
|
||||||
import datetime
|
import datetime
|
||||||
import operator
|
import operator
|
||||||
|
import threading
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_db import api as oslo_db_api
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
from oslo_db.sqlalchemy import session as db_session
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
from oslo_db.sqlalchemy import utils as db_utils
|
from oslo_db.sqlalchemy import utils as db_utils
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
from sqlalchemy.inspection import inspect
|
from sqlalchemy.inspection import inspect
|
||||||
@@ -38,28 +40,7 @@ from watcher import objects
|
|||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
_FACADE = None
|
_CONTEXT = threading.local()
|
||||||
|
|
||||||
|
|
||||||
def _create_facade_lazily():
|
|
||||||
global _FACADE
|
|
||||||
if _FACADE is None:
|
|
||||||
# FIXME(amoralej): Remove autocommit=True (and ideally use of
|
|
||||||
# LegacyEngineFacade) asap since it's not compatible with SQLAlchemy
|
|
||||||
# 2.0.
|
|
||||||
_FACADE = db_session.EngineFacade.from_config(CONF,
|
|
||||||
autocommit=True)
|
|
||||||
return _FACADE
|
|
||||||
|
|
||||||
|
|
||||||
def get_engine():
|
|
||||||
facade = _create_facade_lazily()
|
|
||||||
return facade.get_engine()
|
|
||||||
|
|
||||||
|
|
||||||
def get_session(**kwargs):
|
|
||||||
facade = _create_facade_lazily()
|
|
||||||
return facade.get_session(**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def get_backend():
|
def get_backend():
|
||||||
@@ -67,14 +48,15 @@ def get_backend():
|
|||||||
return Connection()
|
return Connection()
|
||||||
|
|
||||||
|
|
||||||
def model_query(model, *args, **kwargs):
|
def _session_for_read():
|
||||||
"""Query helper for simpler session usage.
|
return enginefacade.reader.using(_CONTEXT)
|
||||||
|
|
||||||
:param session: if present, the session to use
|
|
||||||
"""
|
# NOTE(tylerchristie) Please add @oslo_db_api.retry_on_deadlock decorator to
|
||||||
session = kwargs.get('session') or get_session()
|
# any new methods using _session_for_write (as deadlocks happen on write), so
|
||||||
query = session.query(model, *args)
|
# that oslo_db is able to retry in case of deadlocks.
|
||||||
return query
|
def _session_for_write():
|
||||||
|
return enginefacade.writer.using(_CONTEXT)
|
||||||
|
|
||||||
|
|
||||||
def add_identity_filter(query, value):
|
def add_identity_filter(query, value):
|
||||||
@@ -97,8 +79,6 @@ def add_identity_filter(query, value):
|
|||||||
|
|
||||||
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||||
sort_dir=None, query=None):
|
sort_dir=None, query=None):
|
||||||
if not query:
|
|
||||||
query = model_query(model)
|
|
||||||
sort_keys = ['id']
|
sort_keys = ['id']
|
||||||
if sort_key and sort_key not in sort_keys:
|
if sort_key and sort_key not in sort_keys:
|
||||||
sort_keys.insert(0, sort_key)
|
sort_keys.insert(0, sort_key)
|
||||||
@@ -251,35 +231,39 @@ class Connection(api.BaseConnection):
|
|||||||
query = query.options(joinedload(relationship.key))
|
query = query.options(joinedload(relationship.key))
|
||||||
return query
|
return query
|
||||||
|
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _create(self, model, values):
|
def _create(self, model, values):
|
||||||
obj = model()
|
with _session_for_write() as session:
|
||||||
cleaned_values = {k: v for k, v in values.items()
|
obj = model()
|
||||||
if k not in self._get_relationships(model)}
|
cleaned_values = {k: v for k, v in values.items()
|
||||||
obj.update(cleaned_values)
|
if k not in self._get_relationships(model)}
|
||||||
obj.save()
|
obj.update(cleaned_values)
|
||||||
return obj
|
session.add(obj)
|
||||||
|
session.flush()
|
||||||
|
return obj
|
||||||
|
|
||||||
def _get(self, context, model, fieldname, value, eager):
|
def _get(self, context, model, fieldname, value, eager):
|
||||||
query = model_query(model)
|
with _session_for_read() as session:
|
||||||
if eager:
|
query = session.query(model)
|
||||||
query = self._set_eager_options(model, query)
|
if eager:
|
||||||
|
query = self._set_eager_options(model, query)
|
||||||
|
|
||||||
query = query.filter(getattr(model, fieldname) == value)
|
query = query.filter(getattr(model, fieldname) == value)
|
||||||
if not context.show_deleted:
|
if not context.show_deleted:
|
||||||
query = query.filter(model.deleted_at.is_(None))
|
query = query.filter(model.deleted_at.is_(None))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
obj = query.one()
|
obj = query.one()
|
||||||
except exc.NoResultFound:
|
except exc.NoResultFound:
|
||||||
raise exception.ResourceNotFound(name=model.__name__, id=value)
|
raise exception.ResourceNotFound(name=model.__name__, id=value)
|
||||||
|
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _update(model, id_, values):
|
def _update(model, id_, values):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(model)
|
||||||
query = model_query(model, session=session)
|
|
||||||
query = add_identity_filter(query, id_)
|
query = add_identity_filter(query, id_)
|
||||||
try:
|
try:
|
||||||
ref = query.with_for_update().one()
|
ref = query.with_for_update().one()
|
||||||
@@ -287,13 +271,14 @@ class Connection(api.BaseConnection):
|
|||||||
raise exception.ResourceNotFound(name=model.__name__, id=id_)
|
raise exception.ResourceNotFound(name=model.__name__, id=id_)
|
||||||
|
|
||||||
ref.update(values)
|
ref.update(values)
|
||||||
return ref
|
|
||||||
|
return ref
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _soft_delete(model, id_):
|
def _soft_delete(model, id_):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(model)
|
||||||
query = model_query(model, session=session)
|
|
||||||
query = add_identity_filter(query, id_)
|
query = add_identity_filter(query, id_)
|
||||||
try:
|
try:
|
||||||
row = query.one()
|
row = query.one()
|
||||||
@@ -305,10 +290,10 @@ class Connection(api.BaseConnection):
|
|||||||
return row
|
return row
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _destroy(model, id_):
|
def _destroy(model, id_):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(model)
|
||||||
query = model_query(model, session=session)
|
|
||||||
query = add_identity_filter(query, id_)
|
query = add_identity_filter(query, id_)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -321,14 +306,15 @@ class Connection(api.BaseConnection):
|
|||||||
def _get_model_list(self, model, add_filters_func, context, filters=None,
|
def _get_model_list(self, model, add_filters_func, context, filters=None,
|
||||||
limit=None, marker=None, sort_key=None, sort_dir=None,
|
limit=None, marker=None, sort_key=None, sort_dir=None,
|
||||||
eager=False):
|
eager=False):
|
||||||
query = model_query(model)
|
with _session_for_read() as session:
|
||||||
if eager:
|
query = session.query(model)
|
||||||
query = self._set_eager_options(model, query)
|
if eager:
|
||||||
query = add_filters_func(query, filters)
|
query = self._set_eager_options(model, query)
|
||||||
if not context.show_deleted:
|
query = add_filters_func(query, filters)
|
||||||
query = query.filter(model.deleted_at.is_(None))
|
if not context.show_deleted:
|
||||||
return _paginate_query(model, limit, marker,
|
query = query.filter(model.deleted_at.is_(None))
|
||||||
sort_key, sort_dir, query)
|
return _paginate_query(model, limit, marker,
|
||||||
|
sort_key, sort_dir, query)
|
||||||
|
|
||||||
# NOTE(erakli): _add_..._filters methods should be refactored to have same
|
# NOTE(erakli): _add_..._filters methods should be refactored to have same
|
||||||
# content. join_fieldmap should be filled with JoinMap instead of dict
|
# content. join_fieldmap should be filled with JoinMap instead of dict
|
||||||
@@ -423,11 +409,12 @@ class Connection(api.BaseConnection):
|
|||||||
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
|
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
|
||||||
|
|
||||||
if 'audit_uuid' in filters:
|
if 'audit_uuid' in filters:
|
||||||
stmt = model_query(models.ActionPlan).join(
|
with _session_for_read() as session:
|
||||||
models.Audit,
|
stmt = session.query(models.ActionPlan).join(
|
||||||
models.Audit.id == models.ActionPlan.audit_id)\
|
models.Audit,
|
||||||
.filter_by(uuid=filters['audit_uuid']).subquery()
|
models.Audit.id == models.ActionPlan.audit_id)\
|
||||||
query = query.filter_by(action_plan_id=stmt.c.id)
|
.filter_by(uuid=filters['audit_uuid']).subquery()
|
||||||
|
query = query.filter_by(action_plan_id=stmt.c.id)
|
||||||
|
|
||||||
return query
|
return query
|
||||||
|
|
||||||
@@ -605,20 +592,21 @@ class Connection(api.BaseConnection):
|
|||||||
if not values.get('uuid'):
|
if not values.get('uuid'):
|
||||||
values['uuid'] = utils.generate_uuid()
|
values['uuid'] = utils.generate_uuid()
|
||||||
|
|
||||||
query = model_query(models.AuditTemplate)
|
with _session_for_write() as session:
|
||||||
query = query.filter_by(name=values.get('name'),
|
query = session.query(models.AuditTemplate)
|
||||||
deleted_at=None)
|
query = query.filter_by(name=values.get('name'),
|
||||||
|
deleted_at=None)
|
||||||
|
|
||||||
if len(query.all()) > 0:
|
if len(query.all()) > 0:
|
||||||
raise exception.AuditTemplateAlreadyExists(
|
raise exception.AuditTemplateAlreadyExists(
|
||||||
audit_template=values['name'])
|
audit_template=values['name'])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
audit_template = self._create(models.AuditTemplate, values)
|
audit_template = self._create(models.AuditTemplate, values)
|
||||||
except db_exc.DBDuplicateEntry:
|
except db_exc.DBDuplicateEntry:
|
||||||
raise exception.AuditTemplateAlreadyExists(
|
raise exception.AuditTemplateAlreadyExists(
|
||||||
audit_template=values['name'])
|
audit_template=values['name'])
|
||||||
return audit_template
|
return audit_template
|
||||||
|
|
||||||
def _get_audit_template(self, context, fieldname, value, eager):
|
def _get_audit_template(self, context, fieldname, value, eager):
|
||||||
try:
|
try:
|
||||||
@@ -680,25 +668,26 @@ class Connection(api.BaseConnection):
|
|||||||
if not values.get('uuid'):
|
if not values.get('uuid'):
|
||||||
values['uuid'] = utils.generate_uuid()
|
values['uuid'] = utils.generate_uuid()
|
||||||
|
|
||||||
query = model_query(models.Audit)
|
with _session_for_write() as session:
|
||||||
query = query.filter_by(name=values.get('name'),
|
query = session.query(models.Audit)
|
||||||
deleted_at=None)
|
query = query.filter_by(name=values.get('name'),
|
||||||
|
deleted_at=None)
|
||||||
|
|
||||||
if len(query.all()) > 0:
|
if len(query.all()) > 0:
|
||||||
raise exception.AuditAlreadyExists(
|
raise exception.AuditAlreadyExists(
|
||||||
audit=values['name'])
|
audit=values['name'])
|
||||||
|
|
||||||
if values.get('state') is None:
|
if values.get('state') is None:
|
||||||
values['state'] = objects.audit.State.PENDING
|
values['state'] = objects.audit.State.PENDING
|
||||||
|
|
||||||
if not values.get('auto_trigger'):
|
if not values.get('auto_trigger'):
|
||||||
values['auto_trigger'] = False
|
values['auto_trigger'] = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
audit = self._create(models.Audit, values)
|
audit = self._create(models.Audit, values)
|
||||||
except db_exc.DBDuplicateEntry:
|
except db_exc.DBDuplicateEntry:
|
||||||
raise exception.AuditAlreadyExists(audit=values['uuid'])
|
raise exception.AuditAlreadyExists(audit=values['uuid'])
|
||||||
return audit
|
return audit
|
||||||
|
|
||||||
def _get_audit(self, context, fieldname, value, eager):
|
def _get_audit(self, context, fieldname, value, eager):
|
||||||
try:
|
try:
|
||||||
@@ -722,14 +711,13 @@ class Connection(api.BaseConnection):
|
|||||||
def destroy_audit(self, audit_id):
|
def destroy_audit(self, audit_id):
|
||||||
def is_audit_referenced(session, audit_id):
|
def is_audit_referenced(session, audit_id):
|
||||||
"""Checks whether the audit is referenced by action_plan(s)."""
|
"""Checks whether the audit is referenced by action_plan(s)."""
|
||||||
query = model_query(models.ActionPlan, session=session)
|
query = session.query(models.ActionPlan)
|
||||||
query = self._add_action_plans_filters(
|
query = self._add_action_plans_filters(
|
||||||
query, {'audit_id': audit_id})
|
query, {'audit_id': audit_id})
|
||||||
return query.count() != 0
|
return query.count() != 0
|
||||||
|
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.Audit)
|
||||||
query = model_query(models.Audit, session=session)
|
|
||||||
query = add_identity_filter(query, audit_id)
|
query = add_identity_filter(query, audit_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -796,9 +784,8 @@ class Connection(api.BaseConnection):
|
|||||||
context, fieldname="uuid", value=action_uuid, eager=eager)
|
context, fieldname="uuid", value=action_uuid, eager=eager)
|
||||||
|
|
||||||
def destroy_action(self, action_id):
|
def destroy_action(self, action_id):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.Action)
|
||||||
query = model_query(models.Action, session=session)
|
|
||||||
query = add_identity_filter(query, action_id)
|
query = add_identity_filter(query, action_id)
|
||||||
count = query.delete()
|
count = query.delete()
|
||||||
if count != 1:
|
if count != 1:
|
||||||
@@ -814,9 +801,8 @@ class Connection(api.BaseConnection):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _do_update_action(action_id, values):
|
def _do_update_action(action_id, values):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.Action)
|
||||||
query = model_query(models.Action, session=session)
|
|
||||||
query = add_identity_filter(query, action_id)
|
query = add_identity_filter(query, action_id)
|
||||||
try:
|
try:
|
||||||
ref = query.with_for_update().one()
|
ref = query.with_for_update().one()
|
||||||
@@ -824,7 +810,7 @@ class Connection(api.BaseConnection):
|
|||||||
raise exception.ActionNotFound(action=action_id)
|
raise exception.ActionNotFound(action=action_id)
|
||||||
|
|
||||||
ref.update(values)
|
ref.update(values)
|
||||||
return ref
|
return ref
|
||||||
|
|
||||||
def soft_delete_action(self, action_id):
|
def soft_delete_action(self, action_id):
|
||||||
try:
|
try:
|
||||||
@@ -868,14 +854,13 @@ class Connection(api.BaseConnection):
|
|||||||
def destroy_action_plan(self, action_plan_id):
|
def destroy_action_plan(self, action_plan_id):
|
||||||
def is_action_plan_referenced(session, action_plan_id):
|
def is_action_plan_referenced(session, action_plan_id):
|
||||||
"""Checks whether the action_plan is referenced by action(s)."""
|
"""Checks whether the action_plan is referenced by action(s)."""
|
||||||
query = model_query(models.Action, session=session)
|
query = session.query(models.Action)
|
||||||
query = self._add_actions_filters(
|
query = self._add_actions_filters(
|
||||||
query, {'action_plan_id': action_plan_id})
|
query, {'action_plan_id': action_plan_id})
|
||||||
return query.count() != 0
|
return query.count() != 0
|
||||||
|
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.ActionPlan)
|
||||||
query = model_query(models.ActionPlan, session=session)
|
|
||||||
query = add_identity_filter(query, action_plan_id)
|
query = add_identity_filter(query, action_plan_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -899,9 +884,8 @@ class Connection(api.BaseConnection):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _do_update_action_plan(action_plan_id, values):
|
def _do_update_action_plan(action_plan_id, values):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.ActionPlan)
|
||||||
query = model_query(models.ActionPlan, session=session)
|
|
||||||
query = add_identity_filter(query, action_plan_id)
|
query = add_identity_filter(query, action_plan_id)
|
||||||
try:
|
try:
|
||||||
ref = query.with_for_update().one()
|
ref = query.with_for_update().one()
|
||||||
@@ -909,7 +893,7 @@ class Connection(api.BaseConnection):
|
|||||||
raise exception.ActionPlanNotFound(action_plan=action_plan_id)
|
raise exception.ActionPlanNotFound(action_plan=action_plan_id)
|
||||||
|
|
||||||
ref.update(values)
|
ref.update(values)
|
||||||
return ref
|
return ref
|
||||||
|
|
||||||
def soft_delete_action_plan(self, action_plan_id):
|
def soft_delete_action_plan(self, action_plan_id):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -20,9 +20,9 @@ import alembic
|
|||||||
from alembic import config as alembic_config
|
from alembic import config as alembic_config
|
||||||
import alembic.migration as alembic_migration
|
import alembic.migration as alembic_migration
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
|
|
||||||
from watcher._i18n import _
|
from watcher._i18n import _
|
||||||
from watcher.db.sqlalchemy import api as sqla_api
|
|
||||||
from watcher.db.sqlalchemy import models
|
from watcher.db.sqlalchemy import models
|
||||||
|
|
||||||
|
|
||||||
@@ -39,7 +39,7 @@ def version(engine=None):
|
|||||||
:rtype: string
|
:rtype: string
|
||||||
"""
|
"""
|
||||||
if engine is None:
|
if engine is None:
|
||||||
engine = sqla_api.get_engine()
|
engine = enginefacade.reader.get_engine()
|
||||||
with engine.connect() as conn:
|
with engine.connect() as conn:
|
||||||
context = alembic_migration.MigrationContext.configure(conn)
|
context = alembic_migration.MigrationContext.configure(conn)
|
||||||
return context.get_current_revision()
|
return context.get_current_revision()
|
||||||
@@ -63,7 +63,7 @@ def create_schema(config=None, engine=None):
|
|||||||
Can be used for initial installation instead of upgrade('head').
|
Can be used for initial installation instead of upgrade('head').
|
||||||
"""
|
"""
|
||||||
if engine is None:
|
if engine is None:
|
||||||
engine = sqla_api.get_engine()
|
engine = enginefacade.writer.get_engine()
|
||||||
|
|
||||||
# NOTE(viktors): If we will use metadata.create_all() for non empty db
|
# NOTE(viktors): If we will use metadata.create_all() for non empty db
|
||||||
# schema, it will only add the new tables, but leave
|
# schema, it will only add the new tables, but leave
|
||||||
|
|||||||
@@ -93,14 +93,6 @@ class WatcherBase(models.SoftDeleteMixin,
|
|||||||
d[c.name] = self[c.name]
|
d[c.name] = self[c.name]
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def save(self, session=None):
|
|
||||||
import watcher.db.sqlalchemy.api as db_api
|
|
||||||
|
|
||||||
if session is None:
|
|
||||||
session = db_api.get_session()
|
|
||||||
|
|
||||||
super(WatcherBase, self).save(session)
|
|
||||||
|
|
||||||
|
|
||||||
Base = declarative_base(cls=WatcherBase)
|
Base = declarative_base(cls=WatcherBase)
|
||||||
|
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ class ContinuousAuditHandler(base.AuditHandler):
|
|||||||
self._audit_scheduler = scheduling.BackgroundSchedulerService(
|
self._audit_scheduler = scheduling.BackgroundSchedulerService(
|
||||||
jobstores={
|
jobstores={
|
||||||
'default': job_store.WatcherJobStore(
|
'default': job_store.WatcherJobStore(
|
||||||
engine=sq_api.get_engine()),
|
engine=sq_api.enginefacade.writer.get_engine()),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
return self._audit_scheduler
|
return self._audit_scheduler
|
||||||
|
|||||||
@@ -57,8 +57,19 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
|||||||
self.planned_cold_count = 0
|
self.planned_cold_count = 0
|
||||||
self.volume_count = 0
|
self.volume_count = 0
|
||||||
self.planned_volume_count = 0
|
self.planned_volume_count = 0
|
||||||
self.volume_update_count = 0
|
|
||||||
self.planned_volume_update_count = 0
|
# TODO(sean-n-mooney) This is backward compatibility
|
||||||
|
# for calling the swap code paths. Swap is now an alias
|
||||||
|
# for migrate, we should clean this up in a future
|
||||||
|
# cycle.
|
||||||
|
@property
|
||||||
|
def volume_update_count(self):
|
||||||
|
return self.volume_count
|
||||||
|
|
||||||
|
# same as above clean up later.
|
||||||
|
@property
|
||||||
|
def planned_volume_update_count(self):
|
||||||
|
return self.planned_volume_count
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_name(cls):
|
def get_name(cls):
|
||||||
@@ -312,8 +323,8 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
|||||||
planned_cold_migrate_instance_count=self.planned_cold_count,
|
planned_cold_migrate_instance_count=self.planned_cold_count,
|
||||||
volume_migrate_count=self.volume_count,
|
volume_migrate_count=self.volume_count,
|
||||||
planned_volume_migrate_count=self.planned_volume_count,
|
planned_volume_migrate_count=self.planned_volume_count,
|
||||||
volume_update_count=self.volume_update_count,
|
volume_update_count=self.volume_count,
|
||||||
planned_volume_update_count=self.planned_volume_update_count
|
planned_volume_update_count=self.planned_volume_count
|
||||||
)
|
)
|
||||||
|
|
||||||
def set_migration_count(self, targets):
|
def set_migration_count(self, targets):
|
||||||
@@ -328,10 +339,7 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
|||||||
elif self.is_cold(instance):
|
elif self.is_cold(instance):
|
||||||
self.cold_count += 1
|
self.cold_count += 1
|
||||||
for volume in targets.get('volume', []):
|
for volume in targets.get('volume', []):
|
||||||
if self.is_available(volume):
|
self.volume_count += 1
|
||||||
self.volume_count += 1
|
|
||||||
elif self.is_in_use(volume):
|
|
||||||
self.volume_update_count += 1
|
|
||||||
|
|
||||||
def is_live(self, instance):
|
def is_live(self, instance):
|
||||||
status = getattr(instance, 'status')
|
status = getattr(instance, 'status')
|
||||||
@@ -404,19 +412,16 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
|||||||
LOG.debug(src_type)
|
LOG.debug(src_type)
|
||||||
LOG.debug("%s %s", dst_pool, dst_type)
|
LOG.debug("%s %s", dst_pool, dst_type)
|
||||||
|
|
||||||
if self.is_available(volume):
|
if src_type == dst_type:
|
||||||
if src_type == dst_type:
|
self._volume_migrate(volume, dst_pool)
|
||||||
self._volume_migrate(volume, dst_pool)
|
else:
|
||||||
else:
|
self._volume_retype(volume, dst_type)
|
||||||
self._volume_retype(volume, dst_type)
|
|
||||||
elif self.is_in_use(volume):
|
|
||||||
self._volume_update(volume, dst_type)
|
|
||||||
|
|
||||||
# if with_attached_volume is True, migrate attaching instances
|
# if with_attached_volume is True, migrate attaching instances
|
||||||
if self.with_attached_volume:
|
if self.with_attached_volume:
|
||||||
instances = [self.nova.find_instance(dic.get('server_id'))
|
instances = [self.nova.find_instance(dic.get('server_id'))
|
||||||
for dic in volume.attachments]
|
for dic in volume.attachments]
|
||||||
self.instances_migration(instances, action_counter)
|
self.instances_migration(instances, action_counter)
|
||||||
|
|
||||||
action_counter.add_pool(pool)
|
action_counter.add_pool(pool)
|
||||||
|
|
||||||
@@ -464,16 +469,6 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
|||||||
input_parameters=parameters)
|
input_parameters=parameters)
|
||||||
self.planned_cold_count += 1
|
self.planned_cold_count += 1
|
||||||
|
|
||||||
def _volume_update(self, volume, dst_type):
|
|
||||||
parameters = {"migration_type": "swap",
|
|
||||||
"destination_type": dst_type,
|
|
||||||
"resource_name": volume.name}
|
|
||||||
self.solution.add_action(
|
|
||||||
action_type="volume_migrate",
|
|
||||||
resource_id=volume.id,
|
|
||||||
input_parameters=parameters)
|
|
||||||
self.planned_volume_update_count += 1
|
|
||||||
|
|
||||||
def _volume_migrate(self, volume, dst_pool):
|
def _volume_migrate(self, volume, dst_pool):
|
||||||
parameters = {"migration_type": "migrate",
|
parameters = {"migration_type": "migrate",
|
||||||
"destination_node": dst_pool,
|
"destination_node": dst_pool,
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ from watcher.common import cinder_helper
|
|||||||
from watcher.common import clients
|
from watcher.common import clients
|
||||||
from watcher.common import keystone_helper
|
from watcher.common import keystone_helper
|
||||||
from watcher.common import nova_helper
|
from watcher.common import nova_helper
|
||||||
from watcher.common import utils as w_utils
|
|
||||||
from watcher.tests import base
|
from watcher.tests import base
|
||||||
|
|
||||||
|
|
||||||
@@ -102,12 +101,15 @@ class TestMigration(base.TestCase):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def fake_volume(**kwargs):
|
def fake_volume(**kwargs):
|
||||||
|
# FIXME(sean-k-mooney): we should be using real objects in this
|
||||||
|
# test or at lease something more Representative of the real data
|
||||||
volume = mock.MagicMock()
|
volume = mock.MagicMock()
|
||||||
volume.id = kwargs.get('id', TestMigration.VOLUME_UUID)
|
volume.id = kwargs.get('id', TestMigration.VOLUME_UUID)
|
||||||
volume.size = kwargs.get('size', '1')
|
volume.size = kwargs.get('size', '1')
|
||||||
volume.status = kwargs.get('status', 'available')
|
volume.status = kwargs.get('status', 'available')
|
||||||
volume.snapshot_id = kwargs.get('snapshot_id', None)
|
volume.snapshot_id = kwargs.get('snapshot_id', None)
|
||||||
volume.availability_zone = kwargs.get('availability_zone', 'nova')
|
volume.availability_zone = kwargs.get('availability_zone', 'nova')
|
||||||
|
volume.attachments = kwargs.get('attachments', [])
|
||||||
return volume
|
return volume
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -175,42 +177,14 @@ class TestMigration(base.TestCase):
|
|||||||
"storage1-typename",
|
"storage1-typename",
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_swap_success(self):
|
|
||||||
volume = self.fake_volume(
|
|
||||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
|
||||||
self.m_n_helper.find_instance.return_value = self.fake_instance()
|
|
||||||
|
|
||||||
new_volume = self.fake_volume(id=w_utils.generate_uuid())
|
|
||||||
user = mock.Mock()
|
|
||||||
session = mock.MagicMock()
|
|
||||||
self.m_k_helper.create_user.return_value = user
|
|
||||||
self.m_k_helper.create_session.return_value = session
|
|
||||||
self.m_c_helper.get_volume.return_value = volume
|
|
||||||
self.m_c_helper.create_volume.return_value = new_volume
|
|
||||||
|
|
||||||
result = self.action_swap.execute()
|
|
||||||
self.assertTrue(result)
|
|
||||||
|
|
||||||
self.m_n_helper.swap_volume.assert_called_once_with(
|
|
||||||
volume,
|
|
||||||
new_volume
|
|
||||||
)
|
|
||||||
self.m_k_helper.delete_user.assert_called_once_with(user)
|
|
||||||
|
|
||||||
def test_swap_fail(self):
|
|
||||||
# _can_swap fail
|
|
||||||
instance = self.fake_instance(status='STOPPED')
|
|
||||||
self.m_n_helper.find_instance.return_value = instance
|
|
||||||
|
|
||||||
result = self.action_swap.execute()
|
|
||||||
self.assertFalse(result)
|
|
||||||
|
|
||||||
def test_can_swap_success(self):
|
def test_can_swap_success(self):
|
||||||
volume = self.fake_volume(
|
volume = self.fake_volume(
|
||||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
status='in-use', attachments=[
|
||||||
instance = self.fake_instance()
|
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||||
|
|
||||||
|
instance = self.fake_instance()
|
||||||
self.m_n_helper.find_instance.return_value = instance
|
self.m_n_helper.find_instance.return_value = instance
|
||||||
|
|
||||||
result = self.action_swap._can_swap(volume)
|
result = self.action_swap._can_swap(volume)
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
|
|
||||||
@@ -219,16 +193,33 @@ class TestMigration(base.TestCase):
|
|||||||
result = self.action_swap._can_swap(volume)
|
result = self.action_swap._can_swap(volume)
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
|
|
||||||
instance = self.fake_instance(status='RESIZED')
|
|
||||||
self.m_n_helper.find_instance.return_value = instance
|
|
||||||
result = self.action_swap._can_swap(volume)
|
|
||||||
self.assertTrue(result)
|
|
||||||
|
|
||||||
def test_can_swap_fail(self):
|
def test_can_swap_fail(self):
|
||||||
|
|
||||||
volume = self.fake_volume(
|
volume = self.fake_volume(
|
||||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
status='in-use', attachments=[
|
||||||
|
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||||
instance = self.fake_instance(status='STOPPED')
|
instance = self.fake_instance(status='STOPPED')
|
||||||
self.m_n_helper.find_instance.return_value = instance
|
self.m_n_helper.find_instance.return_value = instance
|
||||||
result = self.action_swap._can_swap(volume)
|
result = self.action_swap._can_swap(volume)
|
||||||
self.assertFalse(result)
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
instance = self.fake_instance(status='RESIZED')
|
||||||
|
self.m_n_helper.find_instance.return_value = instance
|
||||||
|
result = self.action_swap._can_swap(volume)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_swap_success(self):
|
||||||
|
volume = self.fake_volume(
|
||||||
|
status='in-use', attachments=[
|
||||||
|
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||||
|
self.m_c_helper.get_volume.return_value = volume
|
||||||
|
|
||||||
|
instance = self.fake_instance()
|
||||||
|
self.m_n_helper.find_instance.return_value = instance
|
||||||
|
|
||||||
|
result = self.action_swap.execute()
|
||||||
|
self.assertTrue(result)
|
||||||
|
self.m_c_helper.migrate.assert_called_once_with(
|
||||||
|
volume,
|
||||||
|
"storage1-poolname"
|
||||||
|
)
|
||||||
|
|||||||
@@ -17,9 +17,10 @@
|
|||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
|
|
||||||
|
|
||||||
from watcher.db import api as dbapi
|
from watcher.db import api as dbapi
|
||||||
from watcher.db.sqlalchemy import api as sqla_api
|
|
||||||
from watcher.db.sqlalchemy import migration
|
from watcher.db.sqlalchemy import migration
|
||||||
from watcher.db.sqlalchemy import models
|
from watcher.db.sqlalchemy import models
|
||||||
from watcher.tests import base
|
from watcher.tests import base
|
||||||
@@ -35,16 +36,16 @@ _DB_CACHE = None
|
|||||||
|
|
||||||
class Database(fixtures.Fixture):
|
class Database(fixtures.Fixture):
|
||||||
|
|
||||||
def __init__(self, db_api, db_migrate, sql_connection):
|
def __init__(self, engine, db_migrate, sql_connection):
|
||||||
self.sql_connection = sql_connection
|
self.sql_connection = sql_connection
|
||||||
|
|
||||||
self.engine = db_api.get_engine()
|
self.engine = engine
|
||||||
self.engine.dispose()
|
self.engine.dispose()
|
||||||
conn = self.engine.connect()
|
|
||||||
self.setup_sqlite(db_migrate)
|
|
||||||
self.post_migrations()
|
|
||||||
|
|
||||||
self._DB = "".join(line for line in conn.connection.iterdump())
|
with self.engine.connect() as conn:
|
||||||
|
self.setup_sqlite(db_migrate)
|
||||||
|
self.post_migrations()
|
||||||
|
self._DB = "".join(line for line in conn.connection.iterdump())
|
||||||
self.engine.dispose()
|
self.engine.dispose()
|
||||||
|
|
||||||
def setup_sqlite(self, db_migrate):
|
def setup_sqlite(self, db_migrate):
|
||||||
@@ -55,9 +56,8 @@ class Database(fixtures.Fixture):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(Database, self).setUp()
|
super(Database, self).setUp()
|
||||||
|
with self.engine.connect() as conn:
|
||||||
conn = self.engine.connect()
|
conn.connection.executescript(self._DB)
|
||||||
conn.connection.executescript(self._DB)
|
|
||||||
self.addCleanup(self.engine.dispose)
|
self.addCleanup(self.engine.dispose)
|
||||||
|
|
||||||
def post_migrations(self):
|
def post_migrations(self):
|
||||||
@@ -80,7 +80,9 @@ class DbTestCase(base.TestCase):
|
|||||||
|
|
||||||
global _DB_CACHE
|
global _DB_CACHE
|
||||||
if not _DB_CACHE:
|
if not _DB_CACHE:
|
||||||
_DB_CACHE = Database(sqla_api, migration,
|
engine = enginefacade.writer.get_engine()
|
||||||
|
_DB_CACHE = Database(engine, migration,
|
||||||
sql_connection=CONF.database.connection)
|
sql_connection=CONF.database.connection)
|
||||||
|
engine.dispose()
|
||||||
self.useFixture(_DB_CACHE)
|
self.useFixture(_DB_CACHE)
|
||||||
self._id_gen = utils.id_generator()
|
self._id_gen = utils.id_generator()
|
||||||
|
|||||||
@@ -263,7 +263,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
cfg.CONF.set_override("host", "hostname1")
|
cfg.CONF.set_override("host", "hostname1")
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -286,7 +286,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
self.assertIsNone(self.audits[1].next_run_time)
|
self.assertIsNone(self.audits[1].next_run_time)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -309,7 +309,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
|
|
||||||
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
|
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -328,7 +328,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
audit_handler.launch_audits_periodically)
|
audit_handler.launch_audits_periodically)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -349,7 +349,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
m_add_job.assert_has_calls(calls)
|
m_add_job.assert_has_calls(calls)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -384,7 +384,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
self.assertTrue(is_inactive)
|
self.assertTrue(is_inactive)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.AuditStateTransitionManager,
|
@mock.patch.object(objects.audit.AuditStateTransitionManager,
|
||||||
'is_inactive')
|
'is_inactive')
|
||||||
@@ -406,7 +406,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
self.assertIsNotNone(self.audits[0].next_run_time)
|
self.assertIsNotNone(self.audits[0].next_run_time)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
|
|||||||
@@ -320,7 +320,10 @@ class TestZoneMigration(TestBaseStrategy):
|
|||||||
migration_types = collections.Counter(
|
migration_types = collections.Counter(
|
||||||
[action.get('input_parameters')['migration_type']
|
[action.get('input_parameters')['migration_type']
|
||||||
for action in solution.actions])
|
for action in solution.actions])
|
||||||
self.assertEqual(1, migration_types.get("swap", 0))
|
# watcher no longer implements swap. it is now an
|
||||||
|
# alias for migrate.
|
||||||
|
self.assertEqual(0, migration_types.get("swap", 0))
|
||||||
|
self.assertEqual(1, migration_types.get("migrate", 1))
|
||||||
global_efficacy_value = solution.global_efficacy[3].get('value', 0)
|
global_efficacy_value = solution.global_efficacy[3].get('value', 0)
|
||||||
self.assertEqual(100, global_efficacy_value)
|
self.assertEqual(100, global_efficacy_value)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user