Compare commits
11 Commits
master
...
stable/202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
456f42e1b9 | ||
|
|
b5b1bc5473 | ||
|
|
c049a533e3 | ||
|
|
dbc06d1504 | ||
|
|
54b3b58428 | ||
|
|
8b0f1dbf66 | ||
|
|
0af13220da | ||
|
|
f85521f3c6 | ||
|
|
238bb50f53 | ||
|
|
db85f32675 | ||
|
|
f6015fd625 |
@@ -2,3 +2,4 @@
|
||||
host=review.opendev.org
|
||||
port=29418
|
||||
project=openstack/watcher.git
|
||||
defaultbranch=stable/2024.2
|
||||
|
||||
@@ -85,6 +85,7 @@
|
||||
vars:
|
||||
tempest_concurrency: 1
|
||||
tempest_test_regex: watcher_tempest_plugin.tests.scenario.test_execute_strategies
|
||||
tempest_exclude_regex: .*\[.*\breal_load\b.*\].*
|
||||
|
||||
- job:
|
||||
name: watcher-tempest-multinode
|
||||
|
||||
@@ -275,6 +275,9 @@ function install_watcherclient {
|
||||
git_clone_by_name "python-watcherclient"
|
||||
setup_dev_lib "python-watcherclient"
|
||||
fi
|
||||
if [[ "$GLOBAL_VENV" == "True" ]]; then
|
||||
sudo ln -sf /opt/stack/data/venv/bin/watcher /usr/local/bin
|
||||
fi
|
||||
}
|
||||
|
||||
# install_watcher() - Collect source and prepare
|
||||
@@ -338,6 +341,19 @@ function stop_watcher {
|
||||
done
|
||||
}
|
||||
|
||||
# configure_tempest_for_watcher() - Configure Tempest for watcher
|
||||
function configure_tempest_for_watcher {
|
||||
# Set default microversion for watcher-tempest-plugin
|
||||
# Please make sure to update this when the microversion is updated, otherwise
|
||||
# new tests may be skipped.
|
||||
TEMPEST_WATCHER_MIN_MICROVERSION=${TEMPEST_WATCHER_MIN_MICROVERSION:-"1.0"}
|
||||
TEMPEST_WATCHER_MAX_MICROVERSION=${TEMPEST_WATCHER_MAX_MICROVERSION:-"1.4"}
|
||||
|
||||
# Set microversion options in tempest.conf
|
||||
iniset $TEMPEST_CONFIG optimize min_microversion $TEMPEST_WATCHER_MIN_MICROVERSION
|
||||
iniset $TEMPEST_CONFIG optimize max_microversion $TEMPEST_WATCHER_MAX_MICROVERSION
|
||||
}
|
||||
|
||||
# Restore xtrace
|
||||
$_XTRACE_WATCHER
|
||||
|
||||
|
||||
@@ -38,6 +38,9 @@ if is_service_enabled watcher-api watcher-decision-engine watcher-applier; then
|
||||
# Start the watcher components
|
||||
echo_summary "Starting watcher"
|
||||
start_watcher
|
||||
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
|
||||
echo_summary "Configuring tempest for watcher"
|
||||
configure_tempest_for_watcher
|
||||
fi
|
||||
|
||||
if [[ "$1" == "unstack" ]]; then
|
||||
|
||||
@@ -70,7 +70,7 @@ then write_uwsgi_config "$WATCHER_UWSGI_CONF" "$WATCHER_UWSGI" "/infra-optim"
|
||||
fi
|
||||
|
||||
# Migrate the database
|
||||
watcher-db-manage upgrade || die $LINO "DB migration error"
|
||||
$WATCHER_BIN_DIR/watcher-db-manage upgrade || die $LINO "DB migration error"
|
||||
|
||||
start_watcher
|
||||
|
||||
|
||||
47
releasenotes/notes/bug-2112187-763bae283e0b736d.yaml
Normal file
47
releasenotes/notes/bug-2112187-763bae283e0b736d.yaml
Normal file
@@ -0,0 +1,47 @@
|
||||
---
|
||||
security:
|
||||
- |
|
||||
Watchers no longer forges requests on behalf of a tenant when
|
||||
swapping volumes. Prior to this release watcher had 2 implementations
|
||||
of moving a volume, it could use cinders volume migrate api or its own
|
||||
internal implementation that directly calls nova volume attachment update
|
||||
api. The former is safe and the recommend way to move volumes between
|
||||
cinder storage backend the internal implementation was insecure, fragile
|
||||
due to a lack of error handling and capable of deleting user data.
|
||||
|
||||
Insecure: the internal volume migration operation created a new keystone
|
||||
user with a weak name and password and added it to the tenants project
|
||||
with the admin role. It then used that user to forge request on behalf
|
||||
of the tenant with admin right to swap the volume. if the applier was
|
||||
restarted during the execution of this operation it would never be cleaned
|
||||
up.
|
||||
|
||||
Fragile: the error handling was minimal, the swap volume api is async
|
||||
so watcher has to poll for completion, there was no support to resume
|
||||
that if interrupted of the time out was exceeded.
|
||||
|
||||
Data-loss: while the internal polling logic returned success or failure
|
||||
watcher did not check the result, once the function returned it
|
||||
unconditionally deleted the source volume. For larger volumes this
|
||||
could result in irretrievable data loss.
|
||||
|
||||
Finally if a volume was swapped using the internal workflow it put
|
||||
the nova instance in an out of sync state. If the VM was live migrated
|
||||
after the swap volume completed successfully prior to a hard reboot
|
||||
then the migration would fail or succeed and break tenant isolation.
|
||||
|
||||
see: https://bugs.launchpad.net/nova/+bug/2112187 for details.
|
||||
fixes:
|
||||
- |
|
||||
All code related to creating keystone user and granting roles has been
|
||||
removed. The internal swap volume implementation has been removed and
|
||||
replaced by cinders volume migrate api. Note as part of this change
|
||||
Watcher will no longer attempt volume migrations or retypes if the
|
||||
instance is in the `Verify Resize` task state. This resolves several
|
||||
issues related to volume migration in the zone migration and
|
||||
Storage capacity balance strategies. While efforts have been made
|
||||
to maintain backward compatibility these changes are required to
|
||||
address a security weakness in watcher's prior approach.
|
||||
|
||||
see: https://bugs.launchpad.net/nova/+bug/2112187 for more context.
|
||||
|
||||
2
tox.ini
2
tox.ini
@@ -8,7 +8,7 @@ basepython = python3
|
||||
usedevelop = True
|
||||
allowlist_externals = find
|
||||
rm
|
||||
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
|
||||
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2024.2} {opts} {packages}
|
||||
setenv =
|
||||
VIRTUAL_ENV={envdir}
|
||||
deps =
|
||||
|
||||
@@ -17,14 +17,11 @@ import jsonschema
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from cinderclient import client as cinder_client
|
||||
from watcher._i18n import _
|
||||
from watcher.applier.actions import base
|
||||
from watcher.common import cinder_helper
|
||||
from watcher.common import exception
|
||||
from watcher.common import keystone_helper
|
||||
from watcher.common import nova_helper
|
||||
from watcher.common import utils
|
||||
from watcher import conf
|
||||
|
||||
CONF = conf.CONF
|
||||
@@ -70,8 +67,6 @@ class VolumeMigrate(base.BaseAction):
|
||||
|
||||
def __init__(self, config, osc=None):
|
||||
super(VolumeMigrate, self).__init__(config)
|
||||
self.temp_username = utils.random_string(10)
|
||||
self.temp_password = utils.random_string(10)
|
||||
self.cinder_util = cinder_helper.CinderHelper(osc=self.osc)
|
||||
self.nova_util = nova_helper.NovaHelper(osc=self.osc)
|
||||
|
||||
@@ -134,83 +129,42 @@ class VolumeMigrate(base.BaseAction):
|
||||
|
||||
def _can_swap(self, volume):
|
||||
"""Judge volume can be swapped"""
|
||||
# TODO(sean-k-mooney): rename this to _can_migrate and update
|
||||
# tests to reflect that.
|
||||
|
||||
# cinder volume migration can migrate volumes that are not
|
||||
# attached to instances or nova can migrate the data for cinder
|
||||
# if the volume is in-use. If the volume has no attachments
|
||||
# allow cinder to decided if it can be migrated.
|
||||
if not volume.attachments:
|
||||
return False
|
||||
instance_id = volume.attachments[0]['server_id']
|
||||
instance_status = self.nova_util.find_instance(instance_id).status
|
||||
|
||||
if (volume.status == 'in-use' and
|
||||
instance_status in ('ACTIVE', 'PAUSED', 'RESIZED')):
|
||||
LOG.debug(f"volume: {volume.id} has no attachments")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _create_user(self, volume, user):
|
||||
"""Create user with volume attribute and user information"""
|
||||
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
|
||||
project_id = getattr(volume, 'os-vol-tenant-attr:tenant_id')
|
||||
user['project'] = project_id
|
||||
user['domain'] = keystone_util.get_project(project_id).domain_id
|
||||
user['roles'] = ['admin']
|
||||
return keystone_util.create_user(user)
|
||||
|
||||
def _get_cinder_client(self, session):
|
||||
"""Get cinder client by session"""
|
||||
return cinder_client.Client(
|
||||
CONF.cinder_client.api_version,
|
||||
session=session,
|
||||
endpoint_type=CONF.cinder_client.endpoint_type)
|
||||
|
||||
def _swap_volume(self, volume, dest_type):
|
||||
"""Swap volume to dest_type
|
||||
|
||||
Limitation note: only for compute libvirt driver
|
||||
"""
|
||||
if not dest_type:
|
||||
raise exception.Invalid(
|
||||
message=(_("destination type is required when "
|
||||
"migration type is swap")))
|
||||
# since it has attachments we need to validate nova's constraints
|
||||
instance_id = volume.attachments[0]['server_id']
|
||||
instance_status = self.nova_util.find_instance(instance_id).status
|
||||
LOG.debug(
|
||||
f"volume: {volume.id} is attached to instance: {instance_id} "
|
||||
f"in instance status: {instance_status}")
|
||||
# NOTE(sean-k-mooney): This used to allow RESIZED which
|
||||
# is the resize_verify task state, that is not an acceptable time
|
||||
# to migrate volumes, if nova does not block this in the API
|
||||
# today that is probably a bug. PAUSED is also questionable but
|
||||
# it should generally be safe.
|
||||
return (volume.status == 'in-use' and
|
||||
instance_status in ('ACTIVE', 'PAUSED'))
|
||||
|
||||
def _migrate(self, volume_id, dest_node, dest_type):
|
||||
try:
|
||||
volume = self.cinder_util.get_volume(volume_id)
|
||||
# for backward compatibility map swap to migrate.
|
||||
if self.migration_type in (self.SWAP, self.MIGRATE):
|
||||
if not self._can_swap(volume):
|
||||
raise exception.Invalid(
|
||||
message=(_("Invalid state for swapping volume")))
|
||||
|
||||
user_info = {
|
||||
'name': self.temp_username,
|
||||
'password': self.temp_password}
|
||||
user = self._create_user(volume, user_info)
|
||||
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
|
||||
try:
|
||||
session = keystone_util.create_session(
|
||||
user.id, self.temp_password)
|
||||
temp_cinder = self._get_cinder_client(session)
|
||||
|
||||
# swap volume
|
||||
new_volume = self.cinder_util.create_volume(
|
||||
temp_cinder, volume, dest_type)
|
||||
self.nova_util.swap_volume(volume, new_volume)
|
||||
|
||||
# delete old volume
|
||||
self.cinder_util.delete_volume(volume)
|
||||
|
||||
finally:
|
||||
keystone_util.delete_user(user)
|
||||
|
||||
return True
|
||||
|
||||
def _migrate(self, volume_id, dest_node, dest_type):
|
||||
|
||||
try:
|
||||
volume = self.cinder_util.get_volume(volume_id)
|
||||
if self.migration_type == self.SWAP:
|
||||
if dest_node:
|
||||
LOG.warning("dest_node is ignored")
|
||||
return self._swap_volume(volume, dest_type)
|
||||
return self.cinder_util.migrate(volume, dest_node)
|
||||
elif self.migration_type == self.RETYPE:
|
||||
return self.cinder_util.retype(volume, dest_type)
|
||||
elif self.migration_type == self.MIGRATE:
|
||||
return self.cinder_util.migrate(volume, dest_node)
|
||||
else:
|
||||
raise exception.Invalid(
|
||||
message=(_("Migration of type '%(migration_type)s' is not "
|
||||
|
||||
@@ -11,12 +11,15 @@
|
||||
# under the License.
|
||||
|
||||
from oslo_context import context
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@enginefacade.transaction_context_provider
|
||||
class RequestContext(context.RequestContext):
|
||||
"""Extends security contexts from the OpenStack common library."""
|
||||
|
||||
|
||||
@@ -15,8 +15,6 @@
|
||||
from oslo_log import log
|
||||
|
||||
from keystoneauth1.exceptions import http as ks_exceptions
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from watcher._i18n import _
|
||||
from watcher.common import clients
|
||||
from watcher.common import exception
|
||||
@@ -90,35 +88,3 @@ class KeystoneHelper(object):
|
||||
message=(_("Domain name seems ambiguous: %s") %
|
||||
name_or_id))
|
||||
return domains[0]
|
||||
|
||||
def create_session(self, user_id, password):
|
||||
user = self.get_user(user_id)
|
||||
loader = loading.get_plugin_loader('password')
|
||||
auth = loader.load_from_options(
|
||||
auth_url=CONF.watcher_clients_auth.auth_url,
|
||||
password=password,
|
||||
user_id=user_id,
|
||||
project_id=user.default_project_id)
|
||||
return session.Session(auth=auth)
|
||||
|
||||
def create_user(self, user):
|
||||
project = self.get_project(user['project'])
|
||||
domain = self.get_domain(user['domain'])
|
||||
_user = self.keystone.users.create(
|
||||
user['name'],
|
||||
password=user['password'],
|
||||
domain=domain,
|
||||
project=project,
|
||||
)
|
||||
for role in user['roles']:
|
||||
role = self.get_role(role)
|
||||
self.keystone.roles.grant(
|
||||
role.id, user=_user.id, project=project.id)
|
||||
return _user
|
||||
|
||||
def delete_user(self, user):
|
||||
try:
|
||||
user = self.get_user(user)
|
||||
self.keystone.users.delete(user)
|
||||
except exception.Invalid:
|
||||
pass
|
||||
|
||||
@@ -19,9 +19,7 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import inspect
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
|
||||
from croniter import croniter
|
||||
import eventlet
|
||||
@@ -160,14 +158,10 @@ def extend_with_strict_schema(validator_class):
|
||||
StrictDefaultValidatingDraft4Validator = extend_with_default(
|
||||
extend_with_strict_schema(validators.Draft4Validator))
|
||||
|
||||
|
||||
Draft4Validator = validators.Draft4Validator
|
||||
|
||||
|
||||
def random_string(n):
|
||||
return ''.join([random.choice(
|
||||
string.ascii_letters + string.digits) for i in range(n)])
|
||||
|
||||
|
||||
# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet.
|
||||
# As a workaround, we're delegating such calls to a native thread.
|
||||
def async_compat_call(f, *args, **kwargs):
|
||||
|
||||
@@ -13,8 +13,8 @@
|
||||
from logging import config as log_config
|
||||
|
||||
from alembic import context
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
from watcher.db.sqlalchemy import api as sqla_api
|
||||
from watcher.db.sqlalchemy import models
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
@@ -43,7 +43,7 @@ def run_migrations_online():
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.writer.get_engine()
|
||||
with engine.connect() as connection:
|
||||
context.configure(connection=connection,
|
||||
target_metadata=target_metadata)
|
||||
|
||||
@@ -19,10 +19,12 @@
|
||||
import collections
|
||||
import datetime
|
||||
import operator
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import session as db_session
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import utils as db_utils
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy.inspection import inspect
|
||||
@@ -38,24 +40,7 @@ from watcher import objects
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
_FACADE = None
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
if _FACADE is None:
|
||||
_FACADE = db_session.EngineFacade.from_config(CONF)
|
||||
return _FACADE
|
||||
|
||||
|
||||
def get_engine():
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_engine()
|
||||
|
||||
|
||||
def get_session(**kwargs):
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_session(**kwargs)
|
||||
_CONTEXT = threading.local()
|
||||
|
||||
|
||||
def get_backend():
|
||||
@@ -63,14 +48,15 @@ def get_backend():
|
||||
return Connection()
|
||||
|
||||
|
||||
def model_query(model, *args, **kwargs):
|
||||
"""Query helper for simpler session usage.
|
||||
def _session_for_read():
|
||||
return enginefacade.reader.using(_CONTEXT)
|
||||
|
||||
:param session: if present, the session to use
|
||||
"""
|
||||
session = kwargs.get('session') or get_session()
|
||||
query = session.query(model, *args)
|
||||
return query
|
||||
|
||||
# NOTE(tylerchristie) Please add @oslo_db_api.retry_on_deadlock decorator to
|
||||
# any new methods using _session_for_write (as deadlocks happen on write), so
|
||||
# that oslo_db is able to retry in case of deadlocks.
|
||||
def _session_for_write():
|
||||
return enginefacade.writer.using(_CONTEXT)
|
||||
|
||||
|
||||
def add_identity_filter(query, value):
|
||||
@@ -93,8 +79,6 @@ def add_identity_filter(query, value):
|
||||
|
||||
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||
sort_dir=None, query=None):
|
||||
if not query:
|
||||
query = model_query(model)
|
||||
sort_keys = ['id']
|
||||
if sort_key and sort_key not in sort_keys:
|
||||
sort_keys.insert(0, sort_key)
|
||||
@@ -248,21 +232,20 @@ class Connection(api.BaseConnection):
|
||||
getattr(model, relationship.key)))
|
||||
return query
|
||||
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _create(self, model, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
with _session_for_write() as session:
|
||||
obj = model()
|
||||
cleaned_values = {k: v for k, v in values.items()
|
||||
if k not in self._get_relationships(model)}
|
||||
obj.update(cleaned_values)
|
||||
obj.save(session=session)
|
||||
session.commit()
|
||||
session.add(obj)
|
||||
session.flush()
|
||||
return obj
|
||||
|
||||
def _get(self, context, model, fieldname, value, eager):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(model, session=session)
|
||||
with _session_for_read() as session:
|
||||
query = session.query(model)
|
||||
if eager:
|
||||
query = self._set_eager_options(model, query)
|
||||
|
||||
@@ -278,10 +261,10 @@ class Connection(api.BaseConnection):
|
||||
return obj
|
||||
|
||||
@staticmethod
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _update(model, id_, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(model, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(model)
|
||||
query = add_identity_filter(query, id_)
|
||||
try:
|
||||
ref = query.with_for_update().one()
|
||||
@@ -289,13 +272,14 @@ class Connection(api.BaseConnection):
|
||||
raise exception.ResourceNotFound(name=model.__name__, id=id_)
|
||||
|
||||
ref.update(values)
|
||||
|
||||
return ref
|
||||
|
||||
@staticmethod
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _soft_delete(model, id_):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(model, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(model)
|
||||
query = add_identity_filter(query, id_)
|
||||
try:
|
||||
row = query.one()
|
||||
@@ -307,10 +291,10 @@ class Connection(api.BaseConnection):
|
||||
return row
|
||||
|
||||
@staticmethod
|
||||
@oslo_db_api.retry_on_deadlock
|
||||
def _destroy(model, id_):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(model, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(model)
|
||||
query = add_identity_filter(query, id_)
|
||||
|
||||
try:
|
||||
@@ -323,7 +307,8 @@ class Connection(api.BaseConnection):
|
||||
def _get_model_list(self, model, add_filters_func, context, filters=None,
|
||||
limit=None, marker=None, sort_key=None, sort_dir=None,
|
||||
eager=False):
|
||||
query = model_query(model)
|
||||
with _session_for_read() as session:
|
||||
query = session.query(model)
|
||||
if eager:
|
||||
query = self._set_eager_options(model, query)
|
||||
query = add_filters_func(query, filters)
|
||||
@@ -425,7 +410,8 @@ class Connection(api.BaseConnection):
|
||||
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
|
||||
|
||||
if 'audit_uuid' in filters:
|
||||
stmt = model_query(models.ActionPlan).join(
|
||||
with _session_for_read() as session:
|
||||
stmt = session.query(models.ActionPlan).join(
|
||||
models.Audit,
|
||||
models.Audit.id == models.ActionPlan.audit_id)\
|
||||
.filter_by(uuid=filters['audit_uuid']).subquery()
|
||||
@@ -607,7 +593,8 @@ class Connection(api.BaseConnection):
|
||||
if not values.get('uuid'):
|
||||
values['uuid'] = utils.generate_uuid()
|
||||
|
||||
query = model_query(models.AuditTemplate)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.AuditTemplate)
|
||||
query = query.filter_by(name=values.get('name'),
|
||||
deleted_at=None)
|
||||
|
||||
@@ -682,7 +669,8 @@ class Connection(api.BaseConnection):
|
||||
if not values.get('uuid'):
|
||||
values['uuid'] = utils.generate_uuid()
|
||||
|
||||
query = model_query(models.Audit)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Audit)
|
||||
query = query.filter_by(name=values.get('name'),
|
||||
deleted_at=None)
|
||||
|
||||
@@ -724,14 +712,13 @@ class Connection(api.BaseConnection):
|
||||
def destroy_audit(self, audit_id):
|
||||
def is_audit_referenced(session, audit_id):
|
||||
"""Checks whether the audit is referenced by action_plan(s)."""
|
||||
query = model_query(models.ActionPlan, session=session)
|
||||
query = session.query(models.ActionPlan)
|
||||
query = self._add_action_plans_filters(
|
||||
query, {'audit_id': audit_id})
|
||||
return query.count() != 0
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Audit, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Audit)
|
||||
query = add_identity_filter(query, audit_id)
|
||||
|
||||
try:
|
||||
@@ -798,9 +785,8 @@ class Connection(api.BaseConnection):
|
||||
context, fieldname="uuid", value=action_uuid, eager=eager)
|
||||
|
||||
def destroy_action(self, action_id):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Action, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Action)
|
||||
query = add_identity_filter(query, action_id)
|
||||
count = query.delete()
|
||||
if count != 1:
|
||||
@@ -816,9 +802,8 @@ class Connection(api.BaseConnection):
|
||||
|
||||
@staticmethod
|
||||
def _do_update_action(action_id, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.Action, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.Action)
|
||||
query = add_identity_filter(query, action_id)
|
||||
try:
|
||||
ref = query.with_for_update().one()
|
||||
@@ -870,14 +855,13 @@ class Connection(api.BaseConnection):
|
||||
def destroy_action_plan(self, action_plan_id):
|
||||
def is_action_plan_referenced(session, action_plan_id):
|
||||
"""Checks whether the action_plan is referenced by action(s)."""
|
||||
query = model_query(models.Action, session=session)
|
||||
query = session.query(models.Action)
|
||||
query = self._add_actions_filters(
|
||||
query, {'action_plan_id': action_plan_id})
|
||||
return query.count() != 0
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.ActionPlan, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.ActionPlan)
|
||||
query = add_identity_filter(query, action_plan_id)
|
||||
|
||||
try:
|
||||
@@ -901,9 +885,8 @@ class Connection(api.BaseConnection):
|
||||
|
||||
@staticmethod
|
||||
def _do_update_action_plan(action_plan_id, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = model_query(models.ActionPlan, session=session)
|
||||
with _session_for_write() as session:
|
||||
query = session.query(models.ActionPlan)
|
||||
query = add_identity_filter(query, action_plan_id)
|
||||
try:
|
||||
ref = query.with_for_update().one()
|
||||
|
||||
@@ -20,9 +20,9 @@ import alembic
|
||||
from alembic import config as alembic_config
|
||||
import alembic.migration as alembic_migration
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
from watcher._i18n import _
|
||||
from watcher.db.sqlalchemy import api as sqla_api
|
||||
from watcher.db.sqlalchemy import models
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ def version(engine=None):
|
||||
:rtype: string
|
||||
"""
|
||||
if engine is None:
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.reader.get_engine()
|
||||
with engine.connect() as conn:
|
||||
context = alembic_migration.MigrationContext.configure(conn)
|
||||
return context.get_current_revision()
|
||||
@@ -63,7 +63,7 @@ def create_schema(config=None, engine=None):
|
||||
Can be used for initial installation instead of upgrade('head').
|
||||
"""
|
||||
if engine is None:
|
||||
engine = sqla_api.get_engine()
|
||||
engine = enginefacade.writer.get_engine()
|
||||
|
||||
# NOTE(viktors): If we will use metadata.create_all() for non empty db
|
||||
# schema, it will only add the new tables, but leave
|
||||
|
||||
@@ -93,14 +93,6 @@ class WatcherBase(models.SoftDeleteMixin,
|
||||
d[c.name] = self[c.name]
|
||||
return d
|
||||
|
||||
def save(self, session=None):
|
||||
import watcher.db.sqlalchemy.api as db_api
|
||||
|
||||
if session is None:
|
||||
session = db_api.get_session()
|
||||
|
||||
super(WatcherBase, self).save(session)
|
||||
|
||||
|
||||
Base = declarative_base(cls=WatcherBase)
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
self._audit_scheduler = scheduling.BackgroundSchedulerService(
|
||||
jobstores={
|
||||
'default': job_store.WatcherJobStore(
|
||||
engine=sq_api.get_engine()),
|
||||
engine=sq_api.enginefacade.writer.get_engine()),
|
||||
}
|
||||
)
|
||||
return self._audit_scheduler
|
||||
|
||||
@@ -57,8 +57,19 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
self.planned_cold_count = 0
|
||||
self.volume_count = 0
|
||||
self.planned_volume_count = 0
|
||||
self.volume_update_count = 0
|
||||
self.planned_volume_update_count = 0
|
||||
|
||||
# TODO(sean-n-mooney) This is backward compatibility
|
||||
# for calling the swap code paths. Swap is now an alias
|
||||
# for migrate, we should clean this up in a future
|
||||
# cycle.
|
||||
@property
|
||||
def volume_update_count(self):
|
||||
return self.volume_count
|
||||
|
||||
# same as above clean up later.
|
||||
@property
|
||||
def planned_volume_update_count(self):
|
||||
return self.planned_volume_count
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
@@ -312,8 +323,8 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
planned_cold_migrate_instance_count=self.planned_cold_count,
|
||||
volume_migrate_count=self.volume_count,
|
||||
planned_volume_migrate_count=self.planned_volume_count,
|
||||
volume_update_count=self.volume_update_count,
|
||||
planned_volume_update_count=self.planned_volume_update_count
|
||||
volume_update_count=self.volume_count,
|
||||
planned_volume_update_count=self.planned_volume_count
|
||||
)
|
||||
|
||||
def set_migration_count(self, targets):
|
||||
@@ -328,10 +339,7 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
elif self.is_cold(instance):
|
||||
self.cold_count += 1
|
||||
for volume in targets.get('volume', []):
|
||||
if self.is_available(volume):
|
||||
self.volume_count += 1
|
||||
elif self.is_in_use(volume):
|
||||
self.volume_update_count += 1
|
||||
|
||||
def is_live(self, instance):
|
||||
status = getattr(instance, 'status')
|
||||
@@ -404,13 +412,10 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
LOG.debug(src_type)
|
||||
LOG.debug("%s %s", dst_pool, dst_type)
|
||||
|
||||
if self.is_available(volume):
|
||||
if src_type == dst_type:
|
||||
self._volume_migrate(volume, dst_pool)
|
||||
else:
|
||||
self._volume_retype(volume, dst_type)
|
||||
elif self.is_in_use(volume):
|
||||
self._volume_update(volume, dst_type)
|
||||
|
||||
# if with_attached_volume is True, migrate attaching instances
|
||||
if self.with_attached_volume:
|
||||
@@ -464,16 +469,6 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
|
||||
input_parameters=parameters)
|
||||
self.planned_cold_count += 1
|
||||
|
||||
def _volume_update(self, volume, dst_type):
|
||||
parameters = {"migration_type": "swap",
|
||||
"destination_type": dst_type,
|
||||
"resource_name": volume.name}
|
||||
self.solution.add_action(
|
||||
action_type="volume_migrate",
|
||||
resource_id=volume.id,
|
||||
input_parameters=parameters)
|
||||
self.planned_volume_update_count += 1
|
||||
|
||||
def _volume_migrate(self, volume, dst_pool):
|
||||
parameters = {"migration_type": "migrate",
|
||||
"destination_node": dst_pool,
|
||||
|
||||
@@ -22,7 +22,6 @@ from watcher.common import cinder_helper
|
||||
from watcher.common import clients
|
||||
from watcher.common import keystone_helper
|
||||
from watcher.common import nova_helper
|
||||
from watcher.common import utils as w_utils
|
||||
from watcher.tests import base
|
||||
|
||||
|
||||
@@ -102,12 +101,15 @@ class TestMigration(base.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def fake_volume(**kwargs):
|
||||
# FIXME(sean-k-mooney): we should be using real objects in this
|
||||
# test or at lease something more Representative of the real data
|
||||
volume = mock.MagicMock()
|
||||
volume.id = kwargs.get('id', TestMigration.VOLUME_UUID)
|
||||
volume.size = kwargs.get('size', '1')
|
||||
volume.status = kwargs.get('status', 'available')
|
||||
volume.snapshot_id = kwargs.get('snapshot_id', None)
|
||||
volume.availability_zone = kwargs.get('availability_zone', 'nova')
|
||||
volume.attachments = kwargs.get('attachments', [])
|
||||
return volume
|
||||
|
||||
@staticmethod
|
||||
@@ -175,42 +177,14 @@ class TestMigration(base.TestCase):
|
||||
"storage1-typename",
|
||||
)
|
||||
|
||||
def test_swap_success(self):
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
||||
self.m_n_helper.find_instance.return_value = self.fake_instance()
|
||||
|
||||
new_volume = self.fake_volume(id=w_utils.generate_uuid())
|
||||
user = mock.Mock()
|
||||
session = mock.MagicMock()
|
||||
self.m_k_helper.create_user.return_value = user
|
||||
self.m_k_helper.create_session.return_value = session
|
||||
self.m_c_helper.get_volume.return_value = volume
|
||||
self.m_c_helper.create_volume.return_value = new_volume
|
||||
|
||||
result = self.action_swap.execute()
|
||||
self.assertTrue(result)
|
||||
|
||||
self.m_n_helper.swap_volume.assert_called_once_with(
|
||||
volume,
|
||||
new_volume
|
||||
)
|
||||
self.m_k_helper.delete_user.assert_called_once_with(user)
|
||||
|
||||
def test_swap_fail(self):
|
||||
# _can_swap fail
|
||||
instance = self.fake_instance(status='STOPPED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
|
||||
result = self.action_swap.execute()
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_can_swap_success(self):
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
||||
instance = self.fake_instance()
|
||||
status='in-use', attachments=[
|
||||
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||
|
||||
instance = self.fake_instance()
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertTrue(result)
|
||||
|
||||
@@ -219,16 +193,33 @@ class TestMigration(base.TestCase):
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertTrue(result)
|
||||
|
||||
instance = self.fake_instance(status='RESIZED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_can_swap_fail(self):
|
||||
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[{'server_id': 'server_id'}])
|
||||
status='in-use', attachments=[
|
||||
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||
instance = self.fake_instance(status='STOPPED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertFalse(result)
|
||||
|
||||
instance = self.fake_instance(status='RESIZED')
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
result = self.action_swap._can_swap(volume)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_swap_success(self):
|
||||
volume = self.fake_volume(
|
||||
status='in-use', attachments=[
|
||||
{'server_id': TestMigration.INSTANCE_UUID}])
|
||||
self.m_c_helper.get_volume.return_value = volume
|
||||
|
||||
instance = self.fake_instance()
|
||||
self.m_n_helper.find_instance.return_value = instance
|
||||
|
||||
result = self.action_swap.execute()
|
||||
self.assertTrue(result)
|
||||
self.m_c_helper.migrate.assert_called_once_with(
|
||||
volume,
|
||||
"storage1-poolname"
|
||||
)
|
||||
|
||||
@@ -17,9 +17,10 @@
|
||||
|
||||
import fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
|
||||
from watcher.db import api as dbapi
|
||||
from watcher.db.sqlalchemy import api as sqla_api
|
||||
from watcher.db.sqlalchemy import migration
|
||||
from watcher.db.sqlalchemy import models
|
||||
from watcher.tests import base
|
||||
@@ -35,15 +36,15 @@ _DB_CACHE = None
|
||||
|
||||
class Database(fixtures.Fixture):
|
||||
|
||||
def __init__(self, db_api, db_migrate, sql_connection):
|
||||
def __init__(self, engine, db_migrate, sql_connection):
|
||||
self.sql_connection = sql_connection
|
||||
|
||||
self.engine = db_api.get_engine()
|
||||
self.engine = engine
|
||||
self.engine.dispose()
|
||||
conn = self.engine.connect()
|
||||
|
||||
with self.engine.connect() as conn:
|
||||
self.setup_sqlite(db_migrate)
|
||||
self.post_migrations()
|
||||
|
||||
self._DB = "".join(line for line in conn.connection.iterdump())
|
||||
self.engine.dispose()
|
||||
|
||||
@@ -55,8 +56,7 @@ class Database(fixtures.Fixture):
|
||||
|
||||
def setUp(self):
|
||||
super(Database, self).setUp()
|
||||
|
||||
conn = self.engine.connect()
|
||||
with self.engine.connect() as conn:
|
||||
conn.connection.executescript(self._DB)
|
||||
self.addCleanup(self.engine.dispose)
|
||||
|
||||
@@ -80,7 +80,9 @@ class DbTestCase(base.TestCase):
|
||||
|
||||
global _DB_CACHE
|
||||
if not _DB_CACHE:
|
||||
_DB_CACHE = Database(sqla_api, migration,
|
||||
engine = enginefacade.writer.get_engine()
|
||||
_DB_CACHE = Database(engine, migration,
|
||||
sql_connection=CONF.database.connection)
|
||||
engine.dispose()
|
||||
self.useFixture(_DB_CACHE)
|
||||
self._id_gen = utils.id_generator()
|
||||
|
||||
@@ -263,7 +263,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
cfg.CONF.set_override("host", "hostname1")
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -286,7 +286,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
self.assertIsNone(self.audits[1].next_run_time)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -309,7 +309,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
|
||||
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -328,7 +328,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
audit_handler.launch_audits_periodically)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -349,7 +349,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
m_add_job.assert_has_calls(calls)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
@@ -384,7 +384,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
self.assertTrue(is_inactive)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.AuditStateTransitionManager,
|
||||
'is_inactive')
|
||||
@@ -406,7 +406,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
self.assertIsNotNone(self.audits[0].next_run_time)
|
||||
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
|
||||
@@ -320,7 +320,10 @@ class TestZoneMigration(TestBaseStrategy):
|
||||
migration_types = collections.Counter(
|
||||
[action.get('input_parameters')['migration_type']
|
||||
for action in solution.actions])
|
||||
self.assertEqual(1, migration_types.get("swap", 0))
|
||||
# watcher no longer implements swap. it is now an
|
||||
# alias for migrate.
|
||||
self.assertEqual(0, migration_types.get("swap", 0))
|
||||
self.assertEqual(1, migration_types.get("migrate", 1))
|
||||
global_efficacy_value = solution.global_efficacy[3].get('value', 0)
|
||||
self.assertEqual(100, global_efficacy_value)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user