Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
456f42e1b9 | ||
|
|
b5b1bc5473 | ||
|
|
c049a533e3 | ||
|
|
dbc06d1504 | ||
|
|
54b3b58428 | ||
|
|
8b0f1dbf66 | ||
|
|
0af13220da | ||
|
|
f85521f3c6 | ||
|
|
238bb50f53 | ||
|
|
db85f32675 | ||
|
|
f6015fd625 | ||
|
|
a9dc3794a6 | ||
|
|
d6f169197e | ||
|
|
bc5922c684 |
@@ -2,3 +2,4 @@
|
||||
host=review.opendev.org
|
||||
port=29418
|
||||
project=openstack/watcher.git
|
||||
defaultbranch=stable/2024.2
|
||||
|
||||
@@ -85,6 +85,7 @@
|
||||
vars:
|
||||
tempest_concurrency: 1
|
||||
tempest_test_regex: watcher_tempest_plugin.tests.scenario.test_execute_strategies
|
||||
tempest_exclude_regex: .*\[.*\breal_load\b.*\].*
|
||||
|
||||
- job:
|
||||
name: watcher-tempest-multinode
|
||||
|
||||
@@ -275,6 +275,9 @@ function install_watcherclient {
|
||||
git_clone_by_name "python-watcherclient"
|
||||
setup_dev_lib "python-watcherclient"
|
||||
fi
|
||||
if [[ "$GLOBAL_VENV" == "True" ]]; then
|
||||
sudo ln -sf /opt/stack/data/venv/bin/watcher /usr/local/bin
|
||||
fi
|
||||
}
|
||||
|
||||
# install_watcher() - Collect source and prepare
|
||||
@@ -338,6 +341,19 @@ function stop_watcher {
|
||||
done
|
||||
}
|
||||
|
||||
# configure_tempest_for_watcher() - Configure Tempest for watcher
|
||||
function configure_tempest_for_watcher {
|
||||
# Set default microversion for watcher-tempest-plugin
|
||||
# Please make sure to update this when the microversion is updated, otherwise
|
||||
# new tests may be skipped.
|
||||
TEMPEST_WATCHER_MIN_MICROVERSION=${TEMPEST_WATCHER_MIN_MICROVERSION:-"1.0"}
|
||||
TEMPEST_WATCHER_MAX_MICROVERSION=${TEMPEST_WATCHER_MAX_MICROVERSION:-"1.4"}
|
||||
|
||||
# Set microversion options in tempest.conf
|
||||
iniset $TEMPEST_CONFIG optimize min_microversion $TEMPEST_WATCHER_MIN_MICROVERSION
|
||||
iniset $TEMPEST_CONFIG optimize max_microversion $TEMPEST_WATCHER_MAX_MICROVERSION
|
||||
}
|
||||
|
||||
# Restore xtrace
|
||||
$_XTRACE_WATCHER
|
||||
|
||||
|
||||
@@ -38,6 +38,9 @@ if is_service_enabled watcher-api watcher-decision-engine watcher-applier; then
|
||||
# Start the watcher components
|
||||
echo_summary "Starting watcher"
|
||||
start_watcher
|
||||
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
|
||||
echo_summary "Configuring tempest for watcher"
|
||||
configure_tempest_for_watcher
|
||||
fi
|
||||
|
||||
if [[ "$1" == "unstack" ]]; then
|
||||
|
||||
@@ -70,7 +70,7 @@ then write_uwsgi_config "$WATCHER_UWSGI_CONF" "$WATCHER_UWSGI" "/infra-optim"
|
||||
fi
|
||||
|
||||
# Migrate the database
|
||||
watcher-db-manage upgrade || die $LINO "DB migration error"
|
||||
$WATCHER_BIN_DIR/watcher-db-manage upgrade || die $LINO "DB migration error"
|
||||
|
||||
start_watcher
|
||||
|
||||
|
||||
47
releasenotes/notes/bug-2112187-763bae283e0b736d.yaml
Normal file
47
releasenotes/notes/bug-2112187-763bae283e0b736d.yaml
Normal file
@@ -0,0 +1,47 @@
|
||||
---
|
||||
security:
|
||||
- |
|
||||
Watchers no longer forges requests on behalf of a tenant when
|
||||
swapping volumes. Prior to this release watcher had 2 implementations
|
||||
of moving a volume, it could use cinders volume migrate api or its own
|
||||
internal implementation that directly calls nova volume attachment update
|
||||
api. The former is safe and the recommend way to move volumes between
|
||||
cinder storage backend the internal implementation was insecure, fragile
|
||||
due to a lack of error handling and capable of deleting user data.
|
||||
|
||||
Insecure: the internal volume migration operation created a new keystone
|
||||
user with a weak name and password and added it to the tenants project
|
||||
with the admin role. It then used that user to forge request on behalf
|
||||
of the tenant with admin right to swap the volume. if the applier was
|
||||
restarted during the execution of this operation it would never be cleaned
|
||||
up.
|
||||
|
||||
Fragile: the error handling was minimal, the swap volume api is async
|
||||
so watcher has to poll for completion, there was no support to resume
|
||||
that if interrupted of the time out was exceeded.
|
||||
|
||||
Data-loss: while the internal polling logic returned success or failure
|
||||
watcher did not check the result, once the function returned it
|
||||
unconditionally deleted the source volume. For larger volumes this
|
||||
could result in irretrievable data loss.
|
||||
|
||||
Finally if a volume was swapped using the internal workflow it put
|
||||
the nova instance in an out of sync state. If the VM was live migrated
|
||||
after the swap volume completed successfully prior to a hard reboot
|
||||
then the migration would fail or succeed and break tenant isolation.
|
||||
|
||||
see: https://bugs.launchpad.net/nova/+bug/2112187 for details.
|
||||
fixes:
|
||||
- |
|
||||
All code related to creating keystone user and granting roles has been
|
||||
removed. The internal swap volume implementation has been removed and
|
||||
replaced by cinders volume migrate api. Note as part of this change
|
||||
Watcher will no longer attempt volume migrations or retypes if the
|
||||
instance is in the `Verify Resize` task state. This resolves several
|
||||
issues related to volume migration in the zone migration and
|
||||
Storage capacity balance strategies. While efforts have been made
|
||||
to maintain backward compatibility these changes are required to
|
||||
address a security weakness in watcher's prior approach.
|
||||
|
||||
see: https://bugs.launchpad.net/nova/+bug/2112187 for more context.
|
||||
|
||||
@@ -3,15 +3,16 @@
|
||||
# Andi Chandler <andi@gowling.com>, 2020. #zanata
|
||||
# Andi Chandler <andi@gowling.com>, 2022. #zanata
|
||||
# Andi Chandler <andi@gowling.com>, 2023. #zanata
|
||||
# Andi Chandler <andi@gowling.com>, 2024. #zanata
|
||||
msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: python-watcher\n"
|
||||
"Report-Msgid-Bugs-To: \n"
|
||||
"POT-Creation-Date: 2023-08-14 03:05+0000\n"
|
||||
"POT-Creation-Date: 2024-05-31 14:40+0000\n"
|
||||
"MIME-Version: 1.0\n"
|
||||
"Content-Type: text/plain; charset=UTF-8\n"
|
||||
"Content-Transfer-Encoding: 8bit\n"
|
||||
"PO-Revision-Date: 2023-06-21 07:54+0000\n"
|
||||
"PO-Revision-Date: 2024-04-18 12:21+0000\n"
|
||||
"Last-Translator: Andi Chandler <andi@gowling.com>\n"
|
||||
"Language-Team: English (United Kingdom)\n"
|
||||
"Language: en_GB\n"
|
||||
@@ -63,6 +64,9 @@ msgstr "2.0.0"
|
||||
msgid "2023.1 Series Release Notes"
|
||||
msgstr "2023.1 Series Release Notes"
|
||||
|
||||
msgid "2023.2 Series Release Notes"
|
||||
msgstr "2023.2 Series Release Notes"
|
||||
|
||||
msgid "3.0.0"
|
||||
msgstr "3.0.0"
|
||||
|
||||
|
||||
2
tox.ini
2
tox.ini
@@ -8,7 +8,7 @@ basepython = python3
|
||||
usedevelop = True
|
||||
allowlist_externals = find
|
||||
rm
|
||||
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
|
||||
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2024.2} {opts} {packages}
|
||||
setenv =
|
||||
VIRTUAL_ENV={envdir}
|
||||
deps =
|
||||
|
||||
@@ -17,14 +17,11 @@ import jsonschema
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from cinderclient import client as cinder_client
|
||||
from watcher._i18n import _
|
||||
from watcher.applier.actions import base
|
||||
from watcher.common import cinder_helper
|
||||
from watcher.common import exception
|
||||
from watcher.common import keystone_helper
|
||||
from watcher.common import nova_helper
|
||||
from watcher.common import utils
|
||||
from watcher import conf
|
||||
|
||||
CONF = conf.CONF
|
||||
@@ -70,8 +67,6 @@ class VolumeMigrate(base.BaseAction):
|
||||
|
||||
def __init__(self, config, osc=None):
|
||||
super(VolumeMigrate, self).__init__(config)
|
||||
self.temp_username = utils.random_string(10)
|
||||
self.temp_password = utils.random_string(10)
|
||||
self.cinder_util = cinder_helper.CinderHelper(osc=self.osc)
|
||||
self.nova_util = nova_helper.NovaHelper(osc=self.osc)
|
||||
|
||||
@@ -134,83 +129,42 @@ class VolumeMigrate(base.BaseAction):
|
||||
|
||||
def _can_swap(self, volume):
|
||||
"""Judge volume can be swapped"""
|
||||
# TODO(sean-k-mooney): rename this to _can_migrate and update
|
||||
# tests to reflect that.
|
||||
|
||||
# cinder volume migration can migrate volumes that are not
|
||||
# attached to instances or nova can migrate the data for cinder
|
||||
# if the volume is in-use. If the volume has no attachments
|
||||
# allow cinder to decided if it can be migrated.
|
||||
if not volume.attachments:
|
||||
return False
|
||||
instance_id = volume.attachments[0]['server_id']
|
||||
instance_status = self.nova_util.find_instance(instance_id).status
|
||||
|
||||
if (volume.status == 'in-use' and
|
||||
instance_status in ('ACTIVE', 'PAUSED', 'RESIZED')):
|
||||
LOG.debug(f"volume: {volume.id} has no attachments")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _create_user(self, volume, user):
|
||||
"""Create user with volume attribute and user information"""
|
||||
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
|
||||
project_id = getattr(volume, 'os-vol-tenant-attr:tenant_id')
|
||||
user['project'] = project_id
|
||||
user['domain'] = keystone_util.get_project(project_id).domain_id
|
||||
user['roles'] = ['admin']
|
||||
return keystone_util.create_user(user)
|
||||
|
||||
def _get_cinder_client(self, session):
|
||||
"""Get cinder client by session"""
|
||||
return cinder_client.Client(
|
||||
CONF.cinder_client.api_version,
|
||||
session=session,
|
||||
endpoint_type=CONF.cinder_client.endpoint_type)
|
||||
|
||||
def _swap_volume(self, volume, dest_type):
|
||||
"""Swap volume to dest_type
|
||||
|
||||
Limitation note: only for compute libvirt driver
|
||||
"""
|
||||
if not dest_type:
|
||||
raise exception.Invalid(
|
||||
message=(_("destination type is required when "
|
||||
"migration type is swap")))
|
||||
|
||||
if not self._can_swap(volume):
|
||||
raise exception.Invalid(
|
||||
message=(_("Invalid state for swapping volume")))
|
||||
|
||||
user_info = {
|
||||
'name': self.temp_username,
|
||||
'password': self.temp_password}
|
||||
user = self._create_user(volume, user_info)
|
||||
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
|
||||
try:
|
||||
session = keystone_util.create_session(
|
||||
user.id, self.temp_password)
|
||||
temp_cinder = self._get_cinder_client(session)
|
||||
|
||||
# swap volume
|
||||
new_volume = self.cinder_util.create_volume(
|
||||
temp_cinder, volume, dest_type)
|
||||
self.nova_util.swap_volume(volume, new_volume)
|
||||
|
||||
# delete old volume
|
||||
self.cinder_util.delete_volume(volume)
|
||||
|
||||
finally:
|
||||
keystone_util.delete_user(user)
|
||||
|
||||
return True
|
||||
# since it has attachments we need to validate nova's constraints
|
||||
instance_id = volume.attachments[0]['server_id']
|
||||
instance_status = self.nova_util.find_instance(instance_id).status
|
||||
LOG.debug(
|
||||
f"volume: {volume.id} is attached to instance: {instance_id} "
|
||||
f"in instance status: {instance_status}")
|
||||
# NOTE(sean-k-mooney): This used to allow RESIZED which
|
||||
# is the resize_verify task state, that is not an acceptable time
|
||||
# to migrate volumes, if nova does not block this in the API
|
||||
# today that is probably a bug. PAUSED is also questionable but
|
||||
# it should generally be safe.
|
||||
return (volume.status == 'in-use' and
|
||||
instance_status in ('ACTIVE', 'PAUSED'))
|
||||
|
||||
def _migrate(self, volume_id, dest_node, dest_type):
|
||||
|
||||
try:
|
||||
volume = self.cinder_util.get_volume(volume_id)
|
||||
if self.migration_type == self.SWAP:
|
||||
if dest_node:
|
||||
LOG.warning("dest_node is ignored")
|
||||
return self._swap_volume(volume, dest_type)
|
||||
# for backward compatibility map swap to migrate.
|
||||
if self.migration_type in (self.SWAP, self.MIGRATE):
|
||||
if not self._can_swap(volume):
|
||||
raise exception.Invalid(
|
||||
message=(_("Invalid state for swapping volume")))
|
||||
return self.cinder_util.migrate(volume, dest_node)
|
||||
elif self.migration_type == self.RETYPE:
|
||||
return self.cinder_util.retype(volume, dest_type)
|
||||
elif self.migration_type == self.MIGRATE:
|
||||
return self.cinder_util.migrate(volume, dest_node)
|
||||
else:
|
||||
raise exception.Invalid(
|
||||
message=(_("Migration of type '%(migration_type)s' is not "
|
||||
|
||||
@@ -11,12 +11,15 @@
|
||||
# under the License.
|
||||
|
||||
from oslo_context import context
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@enginefacade.transaction_context_provider
|
||||
class RequestContext(context.RequestContext):
|
||||
"""Extends security contexts from the OpenStack common library."""
|
||||
|
||||
|
||||
@@ -15,8 +15,6 @@
|
||||
from oslo_log import log
|
||||
|
||||
from keystoneauth1.exceptions import http as ks_exceptions
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from watcher._i18n import _
|
||||
from watcher.common import clients
|
||||
from watcher.common import exception
|
||||
@@ -90,35 +88,3 @@ class KeystoneHelper(object):
|
||||
message=(_("Domain name seems ambiguous: %s") %
|
||||
name_or_id))
|
||||
return domains[0]
|
||||
|
||||
def create_session(self, user_id, password):
|
||||
user = self.get_user(user_id)
|
||||
loader = loading.get_plugin_loader('password')
|
||||
auth = loader.load_from_options(
|
||||
auth_url=CONF.watcher_clients_auth.auth_url,
|
||||
password=password,
|
||||
user_id=user_id,
|
||||
project_id=user.default_project_id)
|
||||
return session.Session(auth=auth)
|
||||
|
||||
def create_user(self, user):
|
||||
project = self.get_project(user['project'])
|
||||
domain = self.get_domain(user['domain'])
|
||||
_user = self.keystone.users.create(
|
||||
user['name'],
|
||||
password=user['password'],
|
||||
domain=domain,
|
||||
project=project,
|
||||
)
|
||||
for role in user['roles']:
|
||||
role = self.get_role(role)
|
||||
self.keystone.roles.grant(
|
||||
role.id, user=_user.id, project=project.id)
|
||||
return _user
|
||||
|
||||
def delete_user(self, user):
|
||||
try:
|
||||
user = self.get_user(user)
|
||||
self.keystone.users.delete(user)
|
||||
except exception.Invalid:
|
||||
pass
|
||||
|
||||
@@ -19,9 +19,7 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import inspect
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
|
||||
from croniter import croniter
|
||||
import eventlet
|
||||
@@ -160,14 +158,10 @@ def extend_with_strict_schema(validator_class):
|
||||
StrictDefaultValidatingDraft4Validator = extend_with_default(
|
||||
extend_with_strict_schema(validators.Draft4Validator))
|
||||
|
||||
|
||||
Draft4Validator = validators.Draft4Validator
|
||||
|
||||
|
||||
def random_string(n):
|
||||
return ''.join([random.choice(
|
||||
string.ascii_letters + string.digits) for i in range(n)])
|
||||
|
||||
|
||||
# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet.
|
||||
# As a workaround, we're delegating such calls to a native thread.
|
||||
def async_compat_call(f, *args, **kwargs):
|
||||
|
||||
@@ -13,8 +13,8 @@
|
||||
from logging import config as log_config
|
||||
|
||||
from alembic import context
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
from watcher.db.sqlalchemy import api as sqla_api
|
||||
from watcher.db.sqlalchemy import models
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
@@ -43,7 +43,7 @@ def run_migrations_online():
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.writer.get_engine()
|
||||
with engine.connect() as connection:
|
||||
context.configure(connection=connection,
|
||||
target_metadata=target_metadata)
|
||||
|
||||
@@ -28,7 +28,7 @@ def upgrade():
|
||||
|
||||
op.create_table(
|
||||
'apscheduler_jobs',
|
||||
sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False),
|
||||
sa.Column('id', sa.Unicode(191),
|
||||
nullable=False),
|
||||
sa.Column('next_run_time', sa.Float(25), index=True),
|
||||
sa.Column('job_state', sa.LargeBinary, nullable=False),
|
||||
|
||||
@@ -19,10 +19,12 @@
|
||||
import collections
|
||||
import datetime
|
||||
import operator
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import session as db_session
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import utils as db_utils
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy.inspection import inspect
|
||||
@@ -38,28 +40,7 @@ from watcher import objects
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
_FACADE = None
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
if _FACADE is None:
|
||||
# FIXME(amoralej): Remove autocommit=True (and ideally use of
|
||||
# LegacyEngineFacade) asap since it's not compatible with SQLAlchemy
|
||||
# 2.0.
|
||||
_FACADE = db_session.EngineFacade.from_config(CONF,
|
||||
autocommit=True)
|
||||
return _FACADE
|
||||
|
||||
|
||||
def get_engine():
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_engine()
|
||||
|
||||
|
||||
def get_session(**kwargs):
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_session(**kwargs)
|
||||
_CONTEXT = threading.local()
|
||||
|
||||
|
||||
def get_backend():
|
||||
@@ -67,14 +48,15 @@ def get_backend():
|
||||
return Connection()
|
||||
|
||||
|
||||
def model_query(model, *args, **kwargs):
|
||||
"""Query helper for simpler session usage.
|
||||
def _session_for_read():
|
||||
return enginefacade.reader.using(_CONTEXT)
|
||||
|
||||
:param session: if present, the session to use
|
||||
"""
|
||||
session = kwargs.get('session') or get_session()
|
||||
query = session.query(model, *args)
|
||||
return query
|
||||
|
||||
# NOTE(tylerchristie) Please add @oslo_db_api.retry_on_deadlock decorator to
|
||||
# any new methods using _session_for_write (as deadlocks happen on write), so
|
||||
# that oslo_db is able to retry in case of deadlocks.
|
||||
def _session_for_write():
|
||||
return enginefacade.writer.using(_CONTEXT)
|
||||
|
||||
|
||||
def add_identity_filter(query, value):
|
||||
@@ -97,8 +79,6 @@ def add_identity_filter(query, value):
|
||||
|
||||
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||
sort_dir=None, query=None):
|
||||
if not query:
|
||||
query = model_query(model)
|
||||
sort_keys = ['id']
|
||||
if sort_key and sort_key not in sort_keys:
|
||||
sort_keys.insert(0, sort_key)
|
||||
@@ -248,38 +228,43 @@ class Connection(api.BaseConnection):
|
||||
for relationship in relationships:
|
||||
if not relationship.uselist:
|
||||
# We have a One-to-X relationship
|
||||
query = query.options(joinedload(relationship.key))
|
||||
query = query.options(joinedload(
|
||||
getattr(model, relationship.key)))
|
||||
return query
|
||||
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _create(self, model, values):
|
||||
obj = model()
|
||||
cleaned_values = {k: v for k, v in values.items()
|
||||
if k not in self._get_relationships(model)}
|
||||
obj.update(cleaned_values)
|
||||
obj.save()
|
||||
return obj
|
||||
with _session_for_write() as session:
|
||||
obj = model()
|
||||
cleaned_values = {k: v for k, v in values.items()
|
||||
if k not in self._get_relationships(model)}
|
||||
obj.update(cleaned_values)
|
||||
session.add(obj)
|
||||
session.flush()
|
||||
return obj
|
||||
|
||||
def _get(self, context, model, fieldname, value, eager):
|
||||
query = model_query(model)
|
||||
if eager:
|
||||
query = self._set_eager_options(model, query)
|
||||
with _session_for_read() as session:
|
||||
query = session.query(model)
|
||||
if eager:
|
||||
query = self._set_eager_options(model, query)
|
||||
|
||||
query = query.filter(getattr(model, fieldname) == value)
|
||||
if not context.show_deleted:
|
||||
query = query.filter(model.deleted_at.is_(None))
|
||||
query = query.filter(getattr(model, fieldname) == value)
|
||||
if not context.show_deleted:
|
||||
query = query.filter(model.deleted_at.is_(None))
|
||||
|
||||
try:
|
||||
obj = query.one()
|
||||
except exc.NoResultFound:
|
||||
raise exception.ResourceNotFound(name=model.__name__, id=value)
|
||||
try:
|
||||
obj = query.one()
|
||||
except exc.NoResultFound:
|
||||
raise exception.ResourceNotFound(name=model.__name__, id=value)
|
||||
|
||||
return obj
|
||||
return obj
|
||||
|
||||
@staticmethod
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _update(model, id_, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(model, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(model)
|
||||
query = add_identity_filter(query, id_)
|
||||
try:
|
||||
ref = query.with_for_update().one()
|
||||
@@ -287,13 +272,14 @@ class Connection(api.BaseConnection):
|
||||
raise exception.ResourceNotFound(name=model.__name__, id=id_)
|
||||
|
||||
ref.update(values)
|
||||
return ref
|
||||
|
||||
return ref
|
||||
|
||||
@staticmethod
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _soft_delete(model, id_):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(model, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(model)
|
||||
query = add_identity_filter(query, id_)
|
||||
try:
|
||||
row = query.one()
|
||||
@@ -305,10 +291,10 @@ class Connection(api.BaseConnection):
|
||||
return row
|
||||
|
||||
@staticmethod
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _destroy(model, id_):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(model, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(model)
|
||||
query = add_identity_filter(query, id_)
|
||||
|
||||
try:
|
||||
@@ -321,14 +307,15 @@ class Connection(api.BaseConnection):
|
||||
def _get_model_list(self, model, add_filters_func, context, filters=None,
|
||||
limit=None, marker=None, sort_key=None, sort_dir=None,
|
||||
eager=False):
|
||||
query = model_query(model)
|
||||
if eager:
|
||||
query = self._set_eager_options(model, query)
|
||||
query = add_filters_func(query, filters)
|
||||
if not context.show_deleted:
|
||||
query = query.filter(model.deleted_at.is_(None))
|
||||
return _paginate_query(model, limit, marker,
|
||||
sort_key, sort_dir, query)
|
||||
with _session_for_read() as session:
|
||||
query = session.query(model)
|
||||
if eager:
|
||||
query = self._set_eager_options(model, query)
|
||||
query = add_filters_func(query, filters)
|
||||
if not context.show_deleted:
|
||||
query = query.filter(model.deleted_at.is_(None))
|
||||
return _paginate_query(model, limit, marker,
|
||||
sort_key, sort_dir, query)
|
||||
|
||||
# NOTE(erakli): _add_..._filters methods should be refactored to have same
|
||||
# content. join_fieldmap should be filled with JoinMap instead of dict
|
||||
@@ -423,11 +410,12 @@ class Connection(api.BaseConnection):
|
||||
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
|
||||
|
||||
if 'audit_uuid' in filters:
|
||||
stmt = model_query(models.ActionPlan).join(
|
||||
models.Audit,
|
||||
models.Audit.id == models.ActionPlan.audit_id)\
|
||||
.filter_by(uuid=filters['audit_uuid']).subquery()
|
||||
query = query.filter_by(action_plan_id=stmt.c.id)
|
||||
with _session_for_read() as session:
|
||||
stmt = session.query(models.ActionPlan).join(
|
||||
models.Audit,
|
||||
models.Audit.id == models.ActionPlan.audit_id)\
|
||||
.filter_by(uuid=filters['audit_uuid']).subquery()
|
||||
query = query.filter_by(action_plan_id=stmt.c.id)
|
||||
|
||||
return query
|
||||
|
||||
@@ -605,20 +593,21 @@ class Connection(api.BaseConnection):
|
||||
if not values.get('uuid'):
|
||||
values['uuid'] = utils.generate_uuid()
|
||||
|
||||
query = model_query(models.AuditTemplate)
|
||||
query = query.filter_by(name=values.get('name'),
|
||||
deleted_at=None)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.AuditTemplate)
|
||||
query = query.filter_by(name=values.get('name'),
|
||||
deleted_at=None)
|
||||
|
||||
if len(query.all()) > 0:
|
||||
raise exception.AuditTemplateAlreadyExists(
|
||||
audit_template=values['name'])
|
||||
if len(query.all()) > 0:
|
||||
raise exception.AuditTemplateAlreadyExists(
|
||||
audit_template=values['name'])
|
||||
|
||||
try:
|
||||
audit_template = self._create(models.AuditTemplate, values)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.AuditTemplateAlreadyExists(
|
||||
audit_template=values['name'])
|
||||
return audit_template
|
||||
try:
|
||||
audit_template = self._create(models.AuditTemplate, values)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.AuditTemplateAlreadyExists(
|
||||
audit_template=values['name'])
|
||||
return audit_template
|
||||
|
||||
def _get_audit_template(self, context, fieldname, value, eager):
|
||||
try:
|
||||
@@ -680,25 +669,26 @@ class Connection(api.BaseConnection):
|
||||
if not values.get('uuid'):
|
||||
values['uuid'] = utils.generate_uuid()
|
||||
|
||||
query = model_query(models.Audit)
|
||||
query = query.filter_by(name=values.get('name'),
|
||||
deleted_at=None)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Audit)
|
||||
query = query.filter_by(name=values.get('name'),
|
||||
deleted_at=None)
|
||||
|
||||
if len(query.all()) > 0:
|
||||
raise exception.AuditAlreadyExists(
|
||||
audit=values['name'])
|
||||
if len(query.all()) > 0:
|
||||
raise exception.AuditAlreadyExists(
|
||||
audit=values['name'])
|
||||
|
||||
if values.get('state') is None:
|
||||
values['state'] = objects.audit.State.PENDING
|
||||
if values.get('state') is None:
|
||||
values['state'] = objects.audit.State.PENDING
|
||||
|
||||
if not values.get('auto_trigger'):
|
||||
values['auto_trigger'] = False
|
||||
if not values.get('auto_trigger'):
|
||||
values['auto_trigger'] = False
|
||||
|
||||
try:
|
||||
audit = self._create(models.Audit, values)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.AuditAlreadyExists(audit=values['uuid'])
|
||||
return audit
|
||||
try:
|
||||
audit = self._create(models.Audit, values)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.AuditAlreadyExists(audit=values['uuid'])
|
||||
return audit
|
||||
|
||||
def _get_audit(self, context, fieldname, value, eager):
|
||||
try:
|
||||
@@ -722,14 +712,13 @@ class Connection(api.BaseConnection):
|
||||
def destroy_audit(self, audit_id):
|
||||
def is_audit_referenced(session, audit_id):
|
||||
"""Checks whether the audit is referenced by action_plan(s)."""
|
||||
query = model_query(models.ActionPlan, session=session)
|
||||
query = session.query(models.ActionPlan)
|
||||
query = self._add_action_plans_filters(
|
||||
query, {'audit_id': audit_id})
|
||||
return query.count() != 0
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Audit, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Audit)
|
||||
query = add_identity_filter(query, audit_id)
|
||||
|
||||
try:
|
||||
@@ -796,9 +785,8 @@ class Connection(api.BaseConnection):
|
||||
context, fieldname="uuid", value=action_uuid, eager=eager)
|
||||
|
||||
def destroy_action(self, action_id):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Action, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Action)
|
||||
query = add_identity_filter(query, action_id)
|
||||
count = query.delete()
|
||||
if count != 1:
|
||||
@@ -814,9 +802,8 @@ class Connection(api.BaseConnection):
|
||||
|
||||
@staticmethod
|
||||
def _do_update_action(action_id, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Action, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Action)
|
||||
query = add_identity_filter(query, action_id)
|
||||
try:
|
||||
ref = query.with_for_update().one()
|
||||
@@ -824,7 +811,7 @@ class Connection(api.BaseConnection):
|
||||
raise exception.ActionNotFound(action=action_id)
|
||||
|
||||
ref.update(values)
|
||||
return ref
|
||||
return ref
|
||||
|
||||
def soft_delete_action(self, action_id):
|
||||
try:
|
||||
@@ -868,14 +855,13 @@ class Connection(api.BaseConnection):
|
||||
def destroy_action_plan(self, action_plan_id):
|
||||
def is_action_plan_referenced(session, action_plan_id):
|
||||
"""Checks whether the action_plan is referenced by action(s)."""
|
||||
query = model_query(models.Action, session=session)
|
||||
query = session.query(models.Action)
|
||||
query = self._add_actions_filters(
|
||||
query, {'action_plan_id': action_plan_id})
|
||||
return query.count() != 0
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.ActionPlan, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.ActionPlan)
|
||||
query = add_identity_filter(query, action_plan_id)
|
||||
|
||||
try:
|
||||
@@ -899,9 +885,8 @@ class Connection(api.BaseConnection):
|
||||
|
||||
@staticmethod
|
||||
def _do_update_action_plan(action_plan_id, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.ActionPlan, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.ActionPlan)
|
||||
query = add_identity_filter(query, action_plan_id)
|
||||
try:
|
||||
ref = query.with_for_update().one()
|
||||
@@ -909,7 +894,7 @@ class Connection(api.BaseConnection):
|
||||
raise exception.ActionPlanNotFound(action_plan=action_plan_id)
|
||||
|
||||
ref.update(values)
|
||||
return ref
|
||||
return ref
|
||||
|
||||
def soft_delete_action_plan(self, action_plan_id):
|
||||
try:
|
||||
|
||||
@@ -22,6 +22,7 @@ from apscheduler.jobstores.base import ConflictingIdError
|
||||
from apscheduler.jobstores import sqlalchemy
|
||||
from apscheduler.util import datetime_to_utc_timestamp
|
||||
from apscheduler.util import maybe_ref
|
||||
from apscheduler.util import utc_timestamp_to_datetime
|
||||
|
||||
from watcher.common import context
|
||||
from watcher.common import service
|
||||
@@ -32,7 +33,7 @@ try:
|
||||
except ImportError: # pragma: nocover
|
||||
import pickle
|
||||
|
||||
from sqlalchemy import Table, MetaData, select, and_
|
||||
from sqlalchemy import Table, MetaData, select, and_, null
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
|
||||
@@ -58,8 +59,7 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
|
||||
super(WatcherJobStore, self).__init__(url, engine, tablename,
|
||||
metadata, pickle_protocol)
|
||||
metadata = maybe_ref(metadata) or MetaData()
|
||||
self.jobs_t = Table(tablename, metadata, autoload=True,
|
||||
autoload_with=engine)
|
||||
self.jobs_t = Table(tablename, metadata, autoload_with=engine)
|
||||
service_ident = service.ServiceHeartbeat.get_service_name()
|
||||
self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]}
|
||||
self.service_id = objects.Service.list(context=context.make_context(),
|
||||
@@ -79,7 +79,8 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
|
||||
'tag': jsonutils.dumps(self.tag)
|
||||
})
|
||||
try:
|
||||
self.engine.execute(insert)
|
||||
with self.engine.begin() as conn:
|
||||
conn.execute(insert)
|
||||
except IntegrityError:
|
||||
raise ConflictingIdError(job.id)
|
||||
|
||||
@@ -88,20 +89,36 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
|
||||
self._fix_paused_jobs_sorting(jobs)
|
||||
return jobs
|
||||
|
||||
def get_next_run_time(self):
|
||||
selectable = select(self.jobs_t.c.next_run_time).\
|
||||
where(self.jobs_t.c.next_run_time != null()).\
|
||||
order_by(self.jobs_t.c.next_run_time).limit(1)
|
||||
with self.engine.begin() as connection:
|
||||
# NOTE(danms): The apscheduler implementation of this gets a
|
||||
# decimal.Decimal back from scalar() which causes
|
||||
# utc_timestamp_to_datetime() to choke since it is expecting a
|
||||
# python float. Assume this is SQLAlchemy 2.0 stuff, so just
|
||||
# coerce to a float here.
|
||||
next_run_time = connection.execute(selectable).scalar()
|
||||
return utc_timestamp_to_datetime(float(next_run_time)
|
||||
if next_run_time is not None
|
||||
else None)
|
||||
|
||||
def _get_jobs(self, *conditions):
|
||||
jobs = []
|
||||
conditions += (self.jobs_t.c.service_id == self.service_id,)
|
||||
selectable = select(
|
||||
[self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag]
|
||||
self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag
|
||||
).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions))
|
||||
failed_job_ids = set()
|
||||
for row in self.engine.execute(selectable):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(row.job_state))
|
||||
except Exception:
|
||||
self._logger.exception(
|
||||
'Unable to restore job "%s" -- removing it', row.id)
|
||||
failed_job_ids.add(row.id)
|
||||
with self.engine.begin() as conn:
|
||||
for row in conn.execute(selectable):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(row.job_state))
|
||||
except Exception:
|
||||
self._logger.exception(
|
||||
'Unable to restore job "%s" -- removing it', row.id)
|
||||
failed_job_ids.add(row.id)
|
||||
|
||||
# Remove all the jobs we failed to restore
|
||||
if failed_job_ids:
|
||||
|
||||
@@ -20,9 +20,9 @@ import alembic
|
||||
from alembic import config as alembic_config
|
||||
import alembic.migration as alembic_migration
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
from watcher._i18n import _
|
||||
from watcher.db.sqlalchemy import api as sqla_api
|
||||
from watcher.db.sqlalchemy import models
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ def version(engine=None):
|
||||
:rtype: string
|
||||
"""
|
||||
if engine is None:
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.reader.get_engine()
|
||||
with engine.connect() as conn:
|
||||
context = alembic_migration.MigrationContext.configure(conn)
|
||||
return context.get_current_revision()
|
||||
@@ -63,7 +63,7 @@ def create_schema(config=None, engine=None):
|
||||
Can be used for initial installation instead of upgrade('head').
|
||||
"""
|
||||
if engine is None:
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.writer.get_engine()
|
||||
|
||||
# NOTE(viktors): If we will use metadata.create_all() for non empty db
|
||||
# schema, it will only add the new tables, but leave
|
||||
|
||||
@@ -93,14 +93,6 @@ class WatcherBase(models.SoftDeleteMixin,
|
||||
d[c.name] = self[c.name]
|
||||
return d
|
||||
|
||||
def save(self, session=None):
|
||||
import watcher.db.sqlalchemy.api as db_api
|
||||
|
||||
if session is None:
|
||||
session = db_api.get_session()
|
||||
|
||||
super(WatcherBase, self).save(session)
|
||||
|
||||
|
||||
Base = declarative_base(cls=WatcherBase)
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
self._audit_scheduler = scheduling.BackgroundSchedulerService(
|
||||
jobstores={
|
||||
'default': job_store.WatcherJobStore(
|
||||
engine=sq_api.get_engine()),
|
||||
engine=sq_api.enginefacade.writer.get_engine()),
|
||||
}
|
||||
)
|
||||
return self._audit_scheduler
|
||||
|
||||
@@ -57,8 +57,19 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
self.planned_cold_count = 0
|
||||
self.volume_count = 0
|
||||
self.planned_volume_count = 0
|
||||
self.volume_update_count = 0
|
||||
self.planned_volume_update_count = 0
|
||||
|
||||
# TODO(sean-n-mooney) This is backward compatibility
|
||||
# for calling the swap code paths. Swap is now an alias
|
||||
# for migrate, we should clean this up in a future
|
||||
# cycle.
|
||||
@property
|
||||
def volume_update_count(self):
|
||||
return self.volume_count
|
||||
|
||||
# same as above clean up later.
|
||||
@property
|
||||
def planned_volume_update_count(self):
|
||||
return self.planned_volume_count
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
@@ -312,8 +323,8 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
planned_cold_migrate_instance_count=self.planned_cold_count,
|
||||
volume_migrate_count=self.volume_count,
|
||||
planned_volume_migrate_count=self.planned_volume_count,
|
||||
volume_update_count=self.volume_update_count,
|
||||
planned_volume_update_count=self.planned_volume_update_count
|
||||
volume_update_count=self.volume_count,
|
||||
planned_volume_update_count=self.planned_volume_count
|
||||
)
|
||||
|
||||
def set_migration_count(self, targets):
|
||||
@@ -328,10 +339,7 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
elif self.is_cold(instance):
|
||||
self.cold_count += 1
|
||||
for volume in targets.get('volume', []):
|
||||
if self.is_available(volume):
|
||||
self.volume_count += 1
|
||||
elif self.is_in_use(volume):
|
||||
self.volume_update_count += 1
|
||||
self.volume_count += 1
|
||||
|
||||
def is_live(self, instance):
|
||||
status = getattr(instance, 'status')
|
||||
@@ -404,19 +412,16 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
LOG.debug(src_type)
|
||||
LOG.debug("%s %s", dst_pool, dst_type)
|
||||
|
||||
if self.is_available(volume):
|
||||
if src_type == dst_type:
|
||||
self._volume_migrate(volume, dst_pool)
|
||||
else:
|
||||
self._volume_retype(volume, dst_type)
|
||||
elif self.is_in_use(volume):
|
||||
self._volume_update(volume, dst_type)
|
||||
if src_type == dst_type:
|
||||
self._volume_migrate(volume, dst_pool)
|
||||
else:
|
||||
self._volume_retype(volume, dst_type)
|
||||
|
||||
# if with_attached_volume is True, migrate attaching instances
|
||||
if self.with_attached_volume:
|
||||
instances = [self.nova.find_instance(dic.get('server_id'))
|
||||
for dic in volume.attachments]
|
||||
self.instances_migration(instances, action_counter)
|
||||
# if with_attached_volume is True, migrate attaching instances
|
||||
if self.with_attached_volume:
|
||||
instances = [self.nova.find_instance(dic.get('server_id'))
|
||||
for dic in volume.attachments]
|
||||
self.instances_migration(instances, action_counter)
|
||||
|
||||
action_counter.add_pool(pool)
|
||||
|
||||
@@ -464,16 +469,6 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
input_parameters=parameters)
|
||||
self.planned_cold_count += 1
|
||||
|
||||
def _volume_update(self, volume, dst_type):
|
||||
parameters = {"migration_type": "swap",
|
||||
"destination_type": dst_type,
|
||||
"resource_name": volume.name}
|
||||
self.solution.add_action(
|
||||
action_type="volume_migrate",
|
||||
resource_id=volume.id,
|
||||
input_parameters=parameters)
|
||||
self.planned_volume_update_count += 1
|
||||
|
||||
def _volume_migrate(self, volume, dst_pool):
|
||||
parameters = {"migration_type": "migrate",
|
||||
"destination_node": dst_pool,
|
||||
|
||||
@@ -22,7 +22,6 @@ from watcher.common import cinder_helper
|
||||
from watcher.common import clients
|
||||
from watcher.common import keystone_helper
|
||||
from watcher.common import nova_helper
|
||||
from watcher.common import utils as w_utils
|
||||
from watcher.tests import base
|
||||
|
||||
|
||||
@@ -102,12 +101,15 @@ class TestMigration(base.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def fake_volume(**kwargs):
|
||||
# FIXME(sean-k-mooney): we should be using real objects in this
|
||||
# test or at lease something more Representative of the real data
|
||||
volume = mock.MagicMock()
|
||||
volume.id = kwargs.get('id', TestMigration.VOLUME_UUID)
|
||||
volume.size = kwargs.get('size', '1')
|
||||
volume.status = kwargs.get('status', 'available')
|
||||
volume.snapshot_id = kwargs.get('snapshot_id', None)
|
||||
volume.availability_zone = kwargs.get('availability_zone', 'nova')
|
||||
volume.attachments = kwargs.get('attachments', [])
|
||||
return volume
|
||||
|
||||
@staticmethod
|
||||
@@ -175,42 +177,14 @@ class TestMigration(base.TestCase):
|
||||
"storage1-typename",
|
||||
)
|
||||
|
||||
def test_swap_success(self):
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
||||
self.m_n_helper.find_instance.return_value = self.fake_instance()
|
||||
|
||||
new_volume = self.fake_volume(id=w_utils.generate_uuid())
|
||||
user = mock.Mock()
|
||||
session = mock.MagicMock()
|
||||
self.m_k_helper.create_user.return_value = user
|
||||
self.m_k_helper.create_session.return_value = session
|
||||
self.m_c_helper.get_volume.return_value = volume
|
||||
self.m_c_helper.create_volume.return_value = new_volume
|
||||
|
||||
result = self.action_swap.execute()
|
||||
self.assertTrue(result)
|
||||
|
||||
self.m_n_helper.swap_volume.assert_called_once_with(
|
||||
volume,
|
||||
new_volume
|
||||
)
|
||||
self.m_k_helper.delete_user.assert_called_once_with(user)
|
||||
|
||||
def test_swap_fail(self):
|
||||
# _can_swap fail
|
||||
instance = self.fake_instance(status='STOPPED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
|
||||
result = self.action_swap.execute()
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_can_swap_success(self):
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
||||
instance = self.fake_instance()
|
||||
status='in-use', attachments=[
|
||||
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||
|
||||
instance = self.fake_instance()
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertTrue(result)
|
||||
|
||||
@@ -219,16 +193,33 @@ class TestMigration(base.TestCase):
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertTrue(result)
|
||||
|
||||
instance = self.fake_instance(status='RESIZED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_can_swap_fail(self):
|
||||
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
||||
status='in-use', attachments=[
|
||||
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||
instance = self.fake_instance(status='STOPPED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertFalse(result)
|
||||
|
||||
instance = self.fake_instance(status='RESIZED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_swap_success(self):
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[
|
||||
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||
self.m_c_helper.get_volume.return_value = volume
|
||||
|
||||
instance = self.fake_instance()
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
|
||||
result = self.action_swap.execute()
|
||||
self.assertTrue(result)
|
||||
self.m_c_helper.migrate.assert_called_once_with(
|
||||
volume,
|
||||
"storage1-poolname"
|
||||
)
|
||||
|
||||
@@ -17,9 +17,10 @@
|
||||
|
||||
import fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
|
||||
from watcher.db import api as dbapi
|
||||
from watcher.db.sqlalchemy import api as sqla_api
|
||||
from watcher.db.sqlalchemy import migration
|
||||
from watcher.db.sqlalchemy import models
|
||||
from watcher.tests import base
|
||||
@@ -35,16 +36,16 @@ _DB_CACHE = None
|
||||
|
||||
class Database(fixtures.Fixture):
|
||||
|
||||
def __init__(self, db_api, db_migrate, sql_connection):
|
||||
def __init__(self, engine, db_migrate, sql_connection):
|
||||
self.sql_connection = sql_connection
|
||||
|
||||
self.engine = db_api.get_engine()
|
||||
self.engine = engine
|
||||
self.engine.dispose()
|
||||
conn = self.engine.connect()
|
||||
self.setup_sqlite(db_migrate)
|
||||
self.post_migrations()
|
||||
|
||||
self._DB = "".join(line for line in conn.connection.iterdump())
|
||||
with self.engine.connect() as conn:
|
||||
self.setup_sqlite(db_migrate)
|
||||
self.post_migrations()
|
||||
self._DB = "".join(line for line in conn.connection.iterdump())
|
||||
self.engine.dispose()
|
||||
|
||||
def setup_sqlite(self, db_migrate):
|
||||
@@ -55,9 +56,8 @@ class Database(fixtures.Fixture):
|
||||
|
||||
def setUp(self):
|
||||
super(Database, self).setUp()
|
||||
|
||||
conn = self.engine.connect()
|
||||
conn.connection.executescript(self._DB)
|
||||
with self.engine.connect() as conn:
|
||||
conn.connection.executescript(self._DB)
|
||||
self.addCleanup(self.engine.dispose)
|
||||
|
||||
def post_migrations(self):
|
||||
@@ -80,7 +80,9 @@ class DbTestCase(base.TestCase):
|
||||
|
||||
global _DB_CACHE
|
||||
if not _DB_CACHE:
|
||||
_DB_CACHE = Database(sqla_api, migration,
|
||||
engine = enginefacade.writer.get_engine()
|
||||
_DB_CACHE = Database(engine, migration,
|
||||
sql_connection=CONF.database.connection)
|
||||
engine.dispose()
|
||||
self.useFixture(_DB_CACHE)
|
||||
self._id_gen = utils.id_generator()
|
||||
|
||||
@@ -263,7 +263,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
cfg.CONF.set_override("host", "hostname1")
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -286,7 +286,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
self.assertIsNone(self.audits[1].next_run_time)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -309,7 +309,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
|
||||
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -328,7 +328,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
audit_handler.launch_audits_periodically)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -349,7 +349,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
m_add_job.assert_has_calls(calls)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -384,7 +384,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
self.assertTrue(is_inactive)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.AuditStateTransitionManager,
|
||||
'is_inactive')
|
||||
@@ -406,7 +406,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
self.assertIsNotNone(self.audits[0].next_run_time)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
|
||||
@@ -320,7 +320,10 @@ class TestZoneMigration(TestBaseStrategy):
|
||||
migration_types = collections.Counter(
|
||||
[action.get('input_parameters')['migration_type']
|
||||
for action in solution.actions])
|
||||
self.assertEqual(1, migration_types.get("swap", 0))
|
||||
# watcher no longer implements swap. it is now an
|
||||
# alias for migrate.
|
||||
self.assertEqual(0, migration_types.get("swap", 0))
|
||||
self.assertEqual(1, migration_types.get("migrate", 1))
|
||||
global_efficacy_value = solution.global_efficacy[3].get('value', 0)
|
||||
self.assertEqual(100, global_efficacy_value)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user