Compare commits

...

9 Commits

Author SHA1 Message Date
Zuul
8f8d537330 Merge "use cinder migrate for swap volume" into stable/2024.1 2025-08-19 19:09:34 +00:00
Douglas Viroel
6264f17c92 Configure watcher tempest's microversion in devstack
Adds a tempest configuration for min and max microversions supported
by watcher. This help us to define the correct range of microversion
to be tested on each stable branch.
New microversion proposals should also increase the default
max_microversion, in order to work with watcher-tempest-plugin
microversion testing.

Change-Id: I0b695ba4530eb89ed17b3935b87e938cadec84cc
(cherry picked from commit adfe3858aa)
(cherry picked from commit defd3953d8)
(cherry picked from commit b5b1bc5473)
Signed-off-by: Douglas Viroel <viroel@gmail.com>
2025-08-19 01:28:16 +00:00
Sean Mooney
4fca7d246a use cinder migrate for swap volume
This change removes watchers in tree functionality
for swapping instance volumes and defines swap as an alias
of cinder volume migrate.

The watcher native implementation was missing error handling
which could lead to irretrievable data loss.

The removed code also forged project user credentials to
perform admin request as if it was done by a member of a project.
this was unsafe an posses a security risk due to how it was
implemented. This code has been removed without replacement.

While some effort has been made to allow existing
audits that were defined to work, any reduction of functionality
as a result of this security hardening is intentional.

Closes-Bug: #2112187
Change-Id: Ic3b6bfd164e272d70fe86d7b182478dd962f8ac0
Signed-off-by: Sean Mooney <work@seanmooney.info>
(cherry picked from commit 3742e0a79c)
(cherry picked from commit ffec800f59)
(cherry picked from commit c049a533e3)
2025-08-18 16:37:50 +00:00
James Page
64ba589f80 Further database refactoring
More refactoring of the SQLAlchemy database layer to improve
compatility with eventlet on newer Pythons.

Inspired by 0ce2c41404

Related-Bug: 2067815
Change-Id: Ib5e9aa288232cc1b766bbf2a8ce2113d5a8e2f7d
(cherry picked from commit 753c44b0c4)
(cherry picked from commit 54b3b58428)
2025-04-24 17:10:26 +02:00
Tobias Urdin
accc7a2a22 Replace deprecated LegacyEngineFacade
LegacyEngineFacade was deprecated in oslo.db 1.12.0 which was released
in 2015.

Change-Id: I5570698262617eae3f48cf29aacf2e23ad541e5f
(cherry picked from commit 5c627a3aa3)
(cherry picked from commit 8b0f1dbf66)
2025-04-24 17:10:07 +02:00
Alfredo Moralejo
70d92a75cc Skip real-data tests in non-real-data jobs
I am excluding strategies execution with annotation `real_load` in
non-real-load jobs.

This is partial backport of [1].

[1] https://review.opendev.org/c/openstack/watcher/+/945627

Modified cherry-pick as there is not prometheus job in 2024.2.

Change-Id: I77d4c23ebc21693bba8ca0247b8954c6dc8eaba9
(cherry picked from commit ce9f0b4c1e)
(cherry picked from commit dbc06d1504)
2025-04-24 17:09:46 +02:00
OpenStack Release Bot
f5ef5557ab Update TOX_CONSTRAINTS_FILE for stable/2024.1
Update the URL to the upper-constraints file to point to the redirect
rule on releases.openstack.org so that anyone working on this branch
will switch to the correct upper-constraints list automatically when
the requirements repository branches.

Until the requirements repository has as stable/2024.1 branch, tests will
continue to use the upper-constraints list on master.

Change-Id: I67913036e99e24d8963452265b48002ca017af3f
2024-03-30 06:58:10 +00:00
OpenStack Release Bot
c604e00b4d Update .gitreview for stable/2024.1
Change-Id: I08eff2b45fc9ddd9a74ebc21256636ec194446d7
2024-03-30 06:58:05 +00:00
James Page
6b433b3547 Fix oslo.db >= 15.0.0 compatibility
Minimal refactor of SQLAlchemy api module to be compatible with
oslo.db >= 15.0.0 where autocommit behaviour was dropped.

Closes-Bug: #2056181
Change-Id: I33be53f647faae2aad30a43c10980df950d5d7c2
(cherry picked from commit bc5922c684)
2024-03-27 15:14:22 +00:00
20 changed files with 283 additions and 334 deletions

View File

@@ -2,3 +2,4 @@
host=review.opendev.org
port=29418
project=openstack/watcher.git
defaultbranch=stable/2024.1

View File

@@ -85,6 +85,7 @@
vars:
tempest_concurrency: 1
tempest_test_regex: watcher_tempest_plugin.tests.scenario.test_execute_strategies
tempest_exclude_regex: .*\[.*\breal_load\b.*\].*
- job:
name: watcher-tempest-multinode

View File

@@ -338,6 +338,19 @@ function stop_watcher {
done
}
# configure_tempest_for_watcher() - Configure Tempest for watcher
function configure_tempest_for_watcher {
# Set default microversion for watcher-tempest-plugin
# Please make sure to update this when the microversion is updated, otherwise
# new tests may be skipped.
TEMPEST_WATCHER_MIN_MICROVERSION=${TEMPEST_WATCHER_MIN_MICROVERSION:-"1.0"}
TEMPEST_WATCHER_MAX_MICROVERSION=${TEMPEST_WATCHER_MAX_MICROVERSION:-"1.4"}
# Set microversion options in tempest.conf
iniset $TEMPEST_CONFIG optimize min_microversion $TEMPEST_WATCHER_MIN_MICROVERSION
iniset $TEMPEST_CONFIG optimize max_microversion $TEMPEST_WATCHER_MAX_MICROVERSION
}
# Restore xtrace
$_XTRACE_WATCHER

View File

@@ -38,6 +38,9 @@ if is_service_enabled watcher-api watcher-decision-engine watcher-applier; then
# Start the watcher components
echo_summary "Starting watcher"
start_watcher
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
echo_summary "Configuring tempest for watcher"
configure_tempest_for_watcher
fi
if [[ "$1" == "unstack" ]]; then

View File

@@ -0,0 +1,47 @@
---
security:
- |
Watchers no longer forges requests on behalf of a tenant when
swapping volumes. Prior to this release watcher had 2 implementations
of moving a volume, it could use cinders volume migrate api or its own
internal implementation that directly calls nova volume attachment update
api. The former is safe and the recommend way to move volumes between
cinder storage backend the internal implementation was insecure, fragile
due to a lack of error handling and capable of deleting user data.
Insecure: the internal volume migration operation created a new keystone
user with a weak name and password and added it to the tenants project
with the admin role. It then used that user to forge request on behalf
of the tenant with admin right to swap the volume. if the applier was
restarted during the execution of this operation it would never be cleaned
up.
Fragile: the error handling was minimal, the swap volume api is async
so watcher has to poll for completion, there was no support to resume
that if interrupted of the time out was exceeded.
Data-loss: while the internal polling logic returned success or failure
watcher did not check the result, once the function returned it
unconditionally deleted the source volume. For larger volumes this
could result in irretrievable data loss.
Finally if a volume was swapped using the internal workflow it put
the nova instance in an out of sync state. If the VM was live migrated
after the swap volume completed successfully prior to a hard reboot
then the migration would fail or succeed and break tenant isolation.
see: https://bugs.launchpad.net/nova/+bug/2112187 for details.
fixes:
- |
All code related to creating keystone user and granting roles has been
removed. The internal swap volume implementation has been removed and
replaced by cinders volume migrate api. Note as part of this change
Watcher will no longer attempt volume migrations or retypes if the
instance is in the `Verify Resize` task state. This resolves several
issues related to volume migration in the zone migration and
Storage capacity balance strategies. While efforts have been made
to maintain backward compatibility these changes are required to
address a security weakness in watcher's prior approach.
see: https://bugs.launchpad.net/nova/+bug/2112187 for more context.

View File

@@ -8,7 +8,7 @@ basepython = python3
usedevelop = True
allowlist_externals = find
rm
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2024.1} {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
deps =

View File

@@ -17,14 +17,11 @@ import jsonschema
from oslo_log import log
from cinderclient import client as cinder_client
from watcher._i18n import _
from watcher.applier.actions import base
from watcher.common import cinder_helper
from watcher.common import exception
from watcher.common import keystone_helper
from watcher.common import nova_helper
from watcher.common import utils
from watcher import conf
CONF = conf.CONF
@@ -70,8 +67,6 @@ class VolumeMigrate(base.BaseAction):
def __init__(self, config, osc=None):
super(VolumeMigrate, self).__init__(config)
self.temp_username = utils.random_string(10)
self.temp_password = utils.random_string(10)
self.cinder_util = cinder_helper.CinderHelper(osc=self.osc)
self.nova_util = nova_helper.NovaHelper(osc=self.osc)
@@ -134,83 +129,42 @@ class VolumeMigrate(base.BaseAction):
def _can_swap(self, volume):
"""Judge volume can be swapped"""
# TODO(sean-k-mooney): rename this to _can_migrate and update
# tests to reflect that.
# cinder volume migration can migrate volumes that are not
# attached to instances or nova can migrate the data for cinder
# if the volume is in-use. If the volume has no attachments
# allow cinder to decided if it can be migrated.
if not volume.attachments:
return False
instance_id = volume.attachments[0]['server_id']
instance_status = self.nova_util.find_instance(instance_id).status
if (volume.status == 'in-use' and
instance_status in ('ACTIVE', 'PAUSED', 'RESIZED')):
LOG.debug(f"volume: {volume.id} has no attachments")
return True
return False
def _create_user(self, volume, user):
"""Create user with volume attribute and user information"""
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
project_id = getattr(volume, 'os-vol-tenant-attr:tenant_id')
user['project'] = project_id
user['domain'] = keystone_util.get_project(project_id).domain_id
user['roles'] = ['admin']
return keystone_util.create_user(user)
def _get_cinder_client(self, session):
"""Get cinder client by session"""
return cinder_client.Client(
CONF.cinder_client.api_version,
session=session,
endpoint_type=CONF.cinder_client.endpoint_type)
def _swap_volume(self, volume, dest_type):
"""Swap volume to dest_type
Limitation note: only for compute libvirt driver
"""
if not dest_type:
raise exception.Invalid(
message=(_("destination type is required when "
"migration type is swap")))
# since it has attachments we need to validate nova's constraints
instance_id = volume.attachments[0]['server_id']
instance_status = self.nova_util.find_instance(instance_id).status
LOG.debug(
f"volume: {volume.id} is attached to instance: {instance_id} "
f"in instance status: {instance_status}")
# NOTE(sean-k-mooney): This used to allow RESIZED which
# is the resize_verify task state, that is not an acceptable time
# to migrate volumes, if nova does not block this in the API
# today that is probably a bug. PAUSED is also questionable but
# it should generally be safe.
return (volume.status == 'in-use' and
instance_status in ('ACTIVE', 'PAUSED'))
def _migrate(self, volume_id, dest_node, dest_type):
try:
volume = self.cinder_util.get_volume(volume_id)
# for backward compatibility map swap to migrate.
if self.migration_type in (self.SWAP, self.MIGRATE):
if not self._can_swap(volume):
raise exception.Invalid(
message=(_("Invalid state for swapping volume")))
user_info = {
'name': self.temp_username,
'password': self.temp_password}
user = self._create_user(volume, user_info)
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
try:
session = keystone_util.create_session(
user.id, self.temp_password)
temp_cinder = self._get_cinder_client(session)
# swap volume
new_volume = self.cinder_util.create_volume(
temp_cinder, volume, dest_type)
self.nova_util.swap_volume(volume, new_volume)
# delete old volume
self.cinder_util.delete_volume(volume)
finally:
keystone_util.delete_user(user)
return True
def _migrate(self, volume_id, dest_node, dest_type):
try:
volume = self.cinder_util.get_volume(volume_id)
if self.migration_type == self.SWAP:
if dest_node:
LOG.warning("dest_node is ignored")
return self._swap_volume(volume, dest_type)
return self.cinder_util.migrate(volume, dest_node)
elif self.migration_type == self.RETYPE:
return self.cinder_util.retype(volume, dest_type)
elif self.migration_type == self.MIGRATE:
return self.cinder_util.migrate(volume, dest_node)
else:
raise exception.Invalid(
message=(_("Migration of type '%(migration_type)s' is not "

View File

@@ -11,12 +11,15 @@
# under the License.
from oslo_context import context
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log
from oslo_utils import timeutils
LOG = log.getLogger(__name__)
@enginefacade.transaction_context_provider
class RequestContext(context.RequestContext):
"""Extends security contexts from the OpenStack common library."""

View File

@@ -15,8 +15,6 @@
from oslo_log import log
from keystoneauth1.exceptions import http as ks_exceptions
from keystoneauth1 import loading
from keystoneauth1 import session
from watcher._i18n import _
from watcher.common import clients
from watcher.common import exception
@@ -90,35 +88,3 @@ class KeystoneHelper(object):
message=(_("Domain name seems ambiguous: %s") %
name_or_id))
return domains[0]
def create_session(self, user_id, password):
user = self.get_user(user_id)
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
auth_url=CONF.watcher_clients_auth.auth_url,
password=password,
user_id=user_id,
project_id=user.default_project_id)
return session.Session(auth=auth)
def create_user(self, user):
project = self.get_project(user['project'])
domain = self.get_domain(user['domain'])
_user = self.keystone.users.create(
user['name'],
password=user['password'],
domain=domain,
project=project,
)
for role in user['roles']:
role = self.get_role(role)
self.keystone.roles.grant(
role.id, user=_user.id, project=project.id)
return _user
def delete_user(self, user):
try:
user = self.get_user(user)
self.keystone.users.delete(user)
except exception.Invalid:
pass

View File

@@ -19,9 +19,7 @@
import asyncio
import datetime
import inspect
import random
import re
import string
from croniter import croniter
import eventlet
@@ -160,14 +158,10 @@ def extend_with_strict_schema(validator_class):
StrictDefaultValidatingDraft4Validator = extend_with_default(
extend_with_strict_schema(validators.Draft4Validator))
Draft4Validator = validators.Draft4Validator
def random_string(n):
return ''.join([random.choice(
string.ascii_letters + string.digits) for i in range(n)])
# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet.
# As a workaround, we're delegating such calls to a native thread.
def async_compat_call(f, *args, **kwargs):

View File

@@ -13,8 +13,8 @@
from logging import config as log_config
from alembic import context
from oslo_db.sqlalchemy import enginefacade
from watcher.db.sqlalchemy import api as sqla_api
from watcher.db.sqlalchemy import models
# this is the Alembic Config object, which provides
@@ -43,7 +43,7 @@ def run_migrations_online():
and associate a connection with the context.
"""
engine = sqla_api.get_engine()
engine = enginefacade.writer.get_engine()
with engine.connect() as connection:
context.configure(connection=connection,
target_metadata=target_metadata)

View File

@@ -19,10 +19,12 @@
import collections
import datetime
import operator
import threading
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import timeutils
from sqlalchemy.inspection import inspect
@@ -38,28 +40,7 @@ from watcher import objects
CONF = cfg.CONF
_FACADE = None
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
# FIXME(amoralej): Remove autocommit=True (and ideally use of
# LegacyEngineFacade) asap since it's not compatible with SQLAlchemy
# 2.0.
_FACADE = db_session.EngineFacade.from_config(CONF,
autocommit=True)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
_CONTEXT = threading.local()
def get_backend():
@@ -67,14 +48,15 @@ def get_backend():
return Connection()
def model_query(model, *args, **kwargs):
"""Query helper for simpler session usage.
def _session_for_read():
return enginefacade.reader.using(_CONTEXT)
:param session: if present, the session to use
"""
session = kwargs.get('session') or get_session()
query = session.query(model, *args)
return query
# NOTE(tylerchristie) Please add @oslo_db_api.retry_on_deadlock decorator to
# any new methods using _session_for_write (as deadlocks happen on write), so
# that oslo_db is able to retry in case of deadlocks.
def _session_for_write():
return enginefacade.writer.using(_CONTEXT)
def add_identity_filter(query, value):
@@ -97,8 +79,6 @@ def add_identity_filter(query, value):
def _paginate_query(model, limit=None, marker=None, sort_key=None,
sort_dir=None, query=None):
if not query:
query = model_query(model)
sort_keys = ['id']
if sort_key and sort_key not in sort_keys:
sort_keys.insert(0, sort_key)
@@ -251,16 +231,20 @@ class Connection(api.BaseConnection):
query = query.options(joinedload(relationship.key))
return query
@oslo_db_api.retry_on_deadlock
def _create(self, model, values):
with _session_for_write() as session:
obj = model()
cleaned_values = {k: v for k, v in values.items()
if k not in self._get_relationships(model)}
obj.update(cleaned_values)
obj.save()
session.add(obj)
session.flush()
return obj
def _get(self, context, model, fieldname, value, eager):
query = model_query(model)
with _session_for_read() as session:
query = session.query(model)
if eager:
query = self._set_eager_options(model, query)
@@ -276,10 +260,10 @@ class Connection(api.BaseConnection):
return obj
@staticmethod
@oslo_db_api.retry_on_deadlock
def _update(model, id_, values):
session = get_session()
with session.begin():
query = model_query(model, session=session)
with _session_for_write() as session:
query = session.query(model)
query = add_identity_filter(query, id_)
try:
ref = query.with_for_update().one()
@@ -287,13 +271,14 @@ class Connection(api.BaseConnection):
raise exception.ResourceNotFound(name=model.__name__, id=id_)
ref.update(values)
return ref
@staticmethod
@oslo_db_api.retry_on_deadlock
def _soft_delete(model, id_):
session = get_session()
with session.begin():
query = model_query(model, session=session)
with _session_for_write() as session:
query = session.query(model)
query = add_identity_filter(query, id_)
try:
row = query.one()
@@ -305,10 +290,10 @@ class Connection(api.BaseConnection):
return row
@staticmethod
@oslo_db_api.retry_on_deadlock
def _destroy(model, id_):
session = get_session()
with session.begin():
query = model_query(model, session=session)
with _session_for_write() as session:
query = session.query(model)
query = add_identity_filter(query, id_)
try:
@@ -321,7 +306,8 @@ class Connection(api.BaseConnection):
def _get_model_list(self, model, add_filters_func, context, filters=None,
limit=None, marker=None, sort_key=None, sort_dir=None,
eager=False):
query = model_query(model)
with _session_for_read() as session:
query = session.query(model)
if eager:
query = self._set_eager_options(model, query)
query = add_filters_func(query, filters)
@@ -423,7 +409,8 @@ class Connection(api.BaseConnection):
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
if 'audit_uuid' in filters:
stmt = model_query(models.ActionPlan).join(
with _session_for_read() as session:
stmt = session.query(models.ActionPlan).join(
models.Audit,
models.Audit.id == models.ActionPlan.audit_id)\
.filter_by(uuid=filters['audit_uuid']).subquery()
@@ -605,7 +592,8 @@ class Connection(api.BaseConnection):
if not values.get('uuid'):
values['uuid'] = utils.generate_uuid()
query = model_query(models.AuditTemplate)
with _session_for_write() as session:
query = session.query(models.AuditTemplate)
query = query.filter_by(name=values.get('name'),
deleted_at=None)
@@ -680,7 +668,8 @@ class Connection(api.BaseConnection):
if not values.get('uuid'):
values['uuid'] = utils.generate_uuid()
query = model_query(models.Audit)
with _session_for_write() as session:
query = session.query(models.Audit)
query = query.filter_by(name=values.get('name'),
deleted_at=None)
@@ -722,14 +711,13 @@ class Connection(api.BaseConnection):
def destroy_audit(self, audit_id):
def is_audit_referenced(session, audit_id):
"""Checks whether the audit is referenced by action_plan(s)."""
query = model_query(models.ActionPlan, session=session)
query = session.query(models.ActionPlan)
query = self._add_action_plans_filters(
query, {'audit_id': audit_id})
return query.count() != 0
session = get_session()
with session.begin():
query = model_query(models.Audit, session=session)
with _session_for_write() as session:
query = session.query(models.Audit)
query = add_identity_filter(query, audit_id)
try:
@@ -796,9 +784,8 @@ class Connection(api.BaseConnection):
context, fieldname="uuid", value=action_uuid, eager=eager)
def destroy_action(self, action_id):
session = get_session()
with session.begin():
query = model_query(models.Action, session=session)
with _session_for_write() as session:
query = session.query(models.Action)
query = add_identity_filter(query, action_id)
count = query.delete()
if count != 1:
@@ -814,9 +801,8 @@ class Connection(api.BaseConnection):
@staticmethod
def _do_update_action(action_id, values):
session = get_session()
with session.begin():
query = model_query(models.Action, session=session)
with _session_for_write() as session:
query = session.query(models.Action)
query = add_identity_filter(query, action_id)
try:
ref = query.with_for_update().one()
@@ -868,14 +854,13 @@ class Connection(api.BaseConnection):
def destroy_action_plan(self, action_plan_id):
def is_action_plan_referenced(session, action_plan_id):
"""Checks whether the action_plan is referenced by action(s)."""
query = model_query(models.Action, session=session)
query = session.query(models.Action)
query = self._add_actions_filters(
query, {'action_plan_id': action_plan_id})
return query.count() != 0
session = get_session()
with session.begin():
query = model_query(models.ActionPlan, session=session)
with _session_for_write() as session:
query = session.query(models.ActionPlan)
query = add_identity_filter(query, action_plan_id)
try:
@@ -899,9 +884,8 @@ class Connection(api.BaseConnection):
@staticmethod
def _do_update_action_plan(action_plan_id, values):
session = get_session()
with session.begin():
query = model_query(models.ActionPlan, session=session)
with _session_for_write() as session:
query = session.query(models.ActionPlan)
query = add_identity_filter(query, action_plan_id)
try:
ref = query.with_for_update().one()

View File

@@ -20,9 +20,9 @@ import alembic
from alembic import config as alembic_config
import alembic.migration as alembic_migration
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from watcher._i18n import _
from watcher.db.sqlalchemy import api as sqla_api
from watcher.db.sqlalchemy import models
@@ -39,7 +39,7 @@ def version(engine=None):
:rtype: string
"""
if engine is None:
engine = sqla_api.get_engine()
engine = enginefacade.reader.get_engine()
with engine.connect() as conn:
context = alembic_migration.MigrationContext.configure(conn)
return context.get_current_revision()
@@ -63,7 +63,7 @@ def create_schema(config=None, engine=None):
Can be used for initial installation instead of upgrade('head').
"""
if engine is None:
engine = sqla_api.get_engine()
engine = enginefacade.writer.get_engine()
# NOTE(viktors): If we will use metadata.create_all() for non empty db
# schema, it will only add the new tables, but leave

View File

@@ -93,14 +93,6 @@ class WatcherBase(models.SoftDeleteMixin,
d[c.name] = self[c.name]
return d
def save(self, session=None):
import watcher.db.sqlalchemy.api as db_api
if session is None:
session = db_api.get_session()
super(WatcherBase, self).save(session)
Base = declarative_base(cls=WatcherBase)

View File

@@ -52,7 +52,7 @@ class ContinuousAuditHandler(base.AuditHandler):
self._audit_scheduler = scheduling.BackgroundSchedulerService(
jobstores={
'default': job_store.WatcherJobStore(
engine=sq_api.get_engine()),
engine=sq_api.enginefacade.writer.get_engine()),
}
)
return self._audit_scheduler

View File

@@ -57,8 +57,19 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
self.planned_cold_count = 0
self.volume_count = 0
self.planned_volume_count = 0
self.volume_update_count = 0
self.planned_volume_update_count = 0
# TODO(sean-n-mooney) This is backward compatibility
# for calling the swap code paths. Swap is now an alias
# for migrate, we should clean this up in a future
# cycle.
@property
def volume_update_count(self):
return self.volume_count
# same as above clean up later.
@property
def planned_volume_update_count(self):
return self.planned_volume_count
@classmethod
def get_name(cls):
@@ -312,8 +323,8 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
planned_cold_migrate_instance_count=self.planned_cold_count,
volume_migrate_count=self.volume_count,
planned_volume_migrate_count=self.planned_volume_count,
volume_update_count=self.volume_update_count,
planned_volume_update_count=self.planned_volume_update_count
volume_update_count=self.volume_count,
planned_volume_update_count=self.planned_volume_count
)
def set_migration_count(self, targets):
@@ -328,10 +339,7 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
elif self.is_cold(instance):
self.cold_count += 1
for volume in targets.get('volume', []):
if self.is_available(volume):
self.volume_count += 1
elif self.is_in_use(volume):
self.volume_update_count += 1
def is_live(self, instance):
status = getattr(instance, 'status')
@@ -404,13 +412,10 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
LOG.debug(src_type)
LOG.debug("%s %s", dst_pool, dst_type)
if self.is_available(volume):
if src_type == dst_type:
self._volume_migrate(volume, dst_pool)
else:
self._volume_retype(volume, dst_type)
elif self.is_in_use(volume):
self._volume_update(volume, dst_type)
# if with_attached_volume is True, migrate attaching instances
if self.with_attached_volume:
@@ -464,16 +469,6 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
input_parameters=parameters)
self.planned_cold_count += 1
def _volume_update(self, volume, dst_type):
parameters = {"migration_type": "swap",
"destination_type": dst_type,
"resource_name": volume.name}
self.solution.add_action(
action_type="volume_migrate",
resource_id=volume.id,
input_parameters=parameters)
self.planned_volume_update_count += 1
def _volume_migrate(self, volume, dst_pool):
parameters = {"migration_type": "migrate",
"destination_node": dst_pool,

View File

@@ -22,7 +22,6 @@ from watcher.common import cinder_helper
from watcher.common import clients
from watcher.common import keystone_helper
from watcher.common import nova_helper
from watcher.common import utils as w_utils
from watcher.tests import base
@@ -102,12 +101,15 @@ class TestMigration(base.TestCase):
@staticmethod
def fake_volume(**kwargs):
# FIXME(sean-k-mooney): we should be using real objects in this
# test or at lease something more Representative of the real data
volume = mock.MagicMock()
volume.id = kwargs.get('id', TestMigration.VOLUME_UUID)
volume.size = kwargs.get('size', '1')
volume.status = kwargs.get('status', 'available')
volume.snapshot_id = kwargs.get('snapshot_id', None)
volume.availability_zone = kwargs.get('availability_zone', 'nova')
volume.attachments = kwargs.get('attachments', [])
return volume
@staticmethod
@@ -175,42 +177,14 @@ class TestMigration(base.TestCase):
"storage1-typename",
)
def test_swap_success(self):
volume = self.fake_volume(
status='in-use', attachments=[{'server_id': 'server_id'}])
self.m_n_helper.find_instance.return_value = self.fake_instance()
new_volume = self.fake_volume(id=w_utils.generate_uuid())
user = mock.Mock()
session = mock.MagicMock()
self.m_k_helper.create_user.return_value = user
self.m_k_helper.create_session.return_value = session
self.m_c_helper.get_volume.return_value = volume
self.m_c_helper.create_volume.return_value = new_volume
result = self.action_swap.execute()
self.assertTrue(result)
self.m_n_helper.swap_volume.assert_called_once_with(
volume,
new_volume
)
self.m_k_helper.delete_user.assert_called_once_with(user)
def test_swap_fail(self):
# _can_swap fail
instance = self.fake_instance(status='STOPPED')
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap.execute()
self.assertFalse(result)
def test_can_swap_success(self):
volume = self.fake_volume(
status='in-use', attachments=[{'server_id': 'server_id'}])
instance = self.fake_instance()
status='in-use', attachments=[
{'server_id': TestMigration.INSTANCE_UUID}])
instance = self.fake_instance()
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume)
self.assertTrue(result)
@@ -219,16 +193,33 @@ class TestMigration(base.TestCase):
result = self.action_swap._can_swap(volume)
self.assertTrue(result)
instance = self.fake_instance(status='RESIZED')
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume)
self.assertTrue(result)
def test_can_swap_fail(self):
volume = self.fake_volume(
status='in-use', attachments=[{'server_id': 'server_id'}])
status='in-use', attachments=[
{'server_id': TestMigration.INSTANCE_UUID}])
instance = self.fake_instance(status='STOPPED')
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume)
self.assertFalse(result)
instance = self.fake_instance(status='RESIZED')
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume)
self.assertFalse(result)
def test_swap_success(self):
volume = self.fake_volume(
status='in-use', attachments=[
{'server_id': TestMigration.INSTANCE_UUID}])
self.m_c_helper.get_volume.return_value = volume
instance = self.fake_instance()
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap.execute()
self.assertTrue(result)
self.m_c_helper.migrate.assert_called_once_with(
volume,
"storage1-poolname"
)

View File

@@ -17,9 +17,10 @@
import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from watcher.db import api as dbapi
from watcher.db.sqlalchemy import api as sqla_api
from watcher.db.sqlalchemy import migration
from watcher.db.sqlalchemy import models
from watcher.tests import base
@@ -35,15 +36,15 @@ _DB_CACHE = None
class Database(fixtures.Fixture):
def __init__(self, db_api, db_migrate, sql_connection):
def __init__(self, engine, db_migrate, sql_connection):
self.sql_connection = sql_connection
self.engine = db_api.get_engine()
self.engine = engine
self.engine.dispose()
conn = self.engine.connect()
with self.engine.connect() as conn:
self.setup_sqlite(db_migrate)
self.post_migrations()
self._DB = "".join(line for line in conn.connection.iterdump())
self.engine.dispose()
@@ -55,8 +56,7 @@ class Database(fixtures.Fixture):
def setUp(self):
super(Database, self).setUp()
conn = self.engine.connect()
with self.engine.connect() as conn:
conn.connection.executescript(self._DB)
self.addCleanup(self.engine.dispose)
@@ -80,7 +80,9 @@ class DbTestCase(base.TestCase):
global _DB_CACHE
if not _DB_CACHE:
_DB_CACHE = Database(sqla_api, migration,
engine = enginefacade.writer.get_engine()
_DB_CACHE = Database(engine, migration,
sql_connection=CONF.database.connection)
engine.dispose()
self.useFixture(_DB_CACHE)
self._id_gen = utils.id_generator()

View File

@@ -263,7 +263,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
cfg.CONF.set_override("host", "hostname1")
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list')
@@ -286,7 +286,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
self.assertIsNone(self.audits[1].next_run_time)
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list')
@@ -309,7 +309,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list')
@@ -328,7 +328,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
audit_handler.launch_audits_periodically)
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list')
@@ -349,7 +349,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
m_add_job.assert_has_calls(calls)
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list')
@@ -384,7 +384,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
self.assertTrue(is_inactive)
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.AuditStateTransitionManager,
'is_inactive')
@@ -406,7 +406,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
self.assertIsNotNone(self.audits[0].next_run_time)
@mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine')
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')

View File

@@ -320,7 +320,10 @@ class TestZoneMigration(TestBaseStrategy):
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("swap", 0))
# watcher no longer implements swap. it is now an
# alias for migrate.
self.assertEqual(0, migration_types.get("swap", 0))
self.assertEqual(1, migration_types.get("migrate", 1))
global_efficacy_value = solution.global_efficacy[3].get('value', 0)
self.assertEqual(100, global_efficacy_value)