Compare commits

...

14 Commits

Author SHA1 Message Date
Zuul
456f42e1b9 Merge "use cinder migrate for swap volume" into stable/2024.2 2025-08-19 15:03:54 +00:00
Douglas Viroel
b5b1bc5473 Configure watcher tempest's microversion in devstack
Adds a tempest configuration for min and max microversions supported
by watcher. This help us to define the correct range of microversion
to be tested on each stable branch.
New microversion proposals should also increase the default
max_microversion, in order to work with watcher-tempest-plugin
microversion testing.

Change-Id: I0b695ba4530eb89ed17b3935b87e938cadec84cc
(cherry picked from commit adfe3858aa)
(cherry picked from commit defd3953d8)
Signed-off-by: Douglas Viroel <viroel@gmail.com>
2025-08-18 20:31:28 +00:00
Sean Mooney
c049a533e3 use cinder migrate for swap volume
This change removes watchers in tree functionality
for swapping instance volumes and defines swap as an alias
of cinder volume migrate.

The watcher native implementation was missing error handling
which could lead to irretrievable data loss.

The removed code also forged project user credentials to
perform admin request as if it was done by a member of a project.
this was unsafe an posses a security risk due to how it was
implemented. This code has been removed without replacement.

While some effort has been made to allow existing
audits that were defined to work, any reduction of functionality
as a result of this security hardening is intentional.

Closes-Bug: #2112187
Change-Id: Ic3b6bfd164e272d70fe86d7b182478dd962f8ac0
Signed-off-by: Sean Mooney <work@seanmooney.info>
(cherry picked from commit 3742e0a79c)
(cherry picked from commit ffec800f59)
2025-08-18 16:37:19 +00:00
Alfredo Moralejo
dbc06d1504 Skip real-data tests in non-real-data jobs
I am excluding strategies execution with annotation `real_load` in
non-real-load jobs.

This is partial backport of [1].

[1] https://review.opendev.org/c/openstack/watcher/+/945627

Modified cherry-pick as there is not prometheus job in 2024.2.

Change-Id: I77d4c23ebc21693bba8ca0247b8954c6dc8eaba9
(cherry picked from commit ce9f0b4c1e)
2025-04-24 17:07:17 +02:00
James Page
54b3b58428 Further database refactoring
More refactoring of the SQLAlchemy database layer to improve
compatility with eventlet on newer Pythons.

Inspired by 0ce2c41404

Related-Bug: 2067815
Change-Id: Ib5e9aa288232cc1b766bbf2a8ce2113d5a8e2f7d
(cherry picked from commit 753c44b0c4)
2025-02-27 16:15:34 +01:00
Tobias Urdin
8b0f1dbf66 Replace deprecated LegacyEngineFacade
LegacyEngineFacade was deprecated in oslo.db 1.12.0 which was released
in 2015.

Change-Id: I5570698262617eae3f48cf29aacf2e23ad541e5f
(cherry picked from commit 5c627a3aa3)
2025-02-27 16:15:29 +01:00
Zuul
0af13220da Merge "Update TOX_CONSTRAINTS_FILE for stable/2024.2" into stable/2024.2 2025-01-10 00:25:05 +00:00
Zuul
f85521f3c6 Merge "Update .gitreview for stable/2024.2" into stable/2024.2 2025-01-10 00:25:04 +00:00
Ghanshyam Mann
238bb50f53 Run watcher-db-manage in grenade testing from venv
grenade install and run everything from virtual env

- https://review.opendev.org/c/openstack/grenade/+/930507

watcher-db-manage in watcher grenade job needs to be run accordingly
and not from system level. Otherwise it will fail with below error
- https://zuul.opendev.org/t/openstack/build/02c3bd4814ea4d0580f7dfd346416425/log/controller/logs/grenade.sh_log.txt

Change-Id: I73e94222c89c6a12a6006d42637cd194a09005ac
2024-10-22 20:59:34 +00:00
OpenStack Release Bot
db85f32675 Update TOX_CONSTRAINTS_FILE for stable/2024.2
Update the URL to the upper-constraints file to point to the redirect
rule on releases.openstack.org so that anyone working on this branch
will switch to the correct upper-constraints list automatically when
the requirements repository branches.

Until the requirements repository has as stable/2024.2 branch, tests will
continue to use the upper-constraints list on master.

Change-Id: I86a0c943504164832c5b581d8a3bc03b35cd023e
2024-09-16 10:05:12 +00:00
OpenStack Release Bot
f6015fd625 Update .gitreview for stable/2024.2
Change-Id: I2e839b029d934cd9dc1ae6febe4f51d4eb9fe615
2024-09-16 10:05:11 +00:00
OpenStack Proposal Bot
a9dc3794a6 Imported Translations from Zanata
For more information about this automatic import see:
https://docs.openstack.org/i18n/latest/reviewing-translation-import.html

Change-Id: I2b2afb0c0e590b737871bf4c43293df2ed88e534
2024-06-01 02:47:52 +00:00
Takashi Kajinami
d6f169197e SQLAlchemy 2.0: Omnibus fixes patch
This was originally five patches, but they are all needed to pass
any of the test jobs now, so they have been squashed into one:

Co-Authored-By: Dan Smith (dms@danplanet.com)

First:

The autoload argument was removed[1] in SQLAlchemy and only
the autoload_with argument should be passed.

The autoload argument is set according to the autoload_with argument
automatically even in SQLAlchemy 1.x[2] so is not at all needed.

[1] c932123bac
[2] ad8f921e96

Second:

Remove _warn_on_bytestring for newer SA, AFAICT, this flag has been
removed from SQLAlchemy and that is why watcher-db-manage fails to
initialize the DB for me on jammy. This migration was passing the
default value (=False) anyway, so I assume this is the right "fix".

Third:

Fix joinedload passing string attribute names

Fourth:

Fix engine.select pattern to use begin() per the migration guide.

Fifth:

Override the apscheduler get_next_run_time() which appears to be
trivially not compatible with SQLAlchemy 2.0 because of a return type
from scalar().

Change-Id: I000e5e78f97f82ed4ea64d42f1c38354c3252e08
2024-05-29 06:49:32 -07:00
James Page
bc5922c684 Fix oslo.db >= 15.0.0 compatibility
Minimal refactor of SQLAlchemy api module to be compatible with
oslo.db >= 15.0.0 where autocommit behaviour was dropped.

Closes-Bug: #2056181
Change-Id: I33be53f647faae2aad30a43c10980df950d5d7c2
2024-03-27 09:41:23 +00:00
24 changed files with 325 additions and 351 deletions

View File

@@ -2,3 +2,4 @@
host=review.opendev.org host=review.opendev.org
port=29418 port=29418
project=openstack/watcher.git project=openstack/watcher.git
defaultbranch=stable/2024.2

View File

@@ -85,6 +85,7 @@
vars: vars:
tempest_concurrency: 1 tempest_concurrency: 1
tempest_test_regex: watcher_tempest_plugin.tests.scenario.test_execute_strategies tempest_test_regex: watcher_tempest_plugin.tests.scenario.test_execute_strategies
tempest_exclude_regex: .*\[.*\breal_load\b.*\].*
- job: - job:
name: watcher-tempest-multinode name: watcher-tempest-multinode

View File

@@ -275,6 +275,9 @@ function install_watcherclient {
git_clone_by_name "python-watcherclient" git_clone_by_name "python-watcherclient"
setup_dev_lib "python-watcherclient" setup_dev_lib "python-watcherclient"
fi fi
if [[ "$GLOBAL_VENV" == "True" ]]; then
sudo ln -sf /opt/stack/data/venv/bin/watcher /usr/local/bin
fi
} }
# install_watcher() - Collect source and prepare # install_watcher() - Collect source and prepare
@@ -338,6 +341,19 @@ function stop_watcher {
done done
} }
# configure_tempest_for_watcher() - Configure Tempest for watcher
function configure_tempest_for_watcher {
# Set default microversion for watcher-tempest-plugin
# Please make sure to update this when the microversion is updated, otherwise
# new tests may be skipped.
TEMPEST_WATCHER_MIN_MICROVERSION=${TEMPEST_WATCHER_MIN_MICROVERSION:-"1.0"}
TEMPEST_WATCHER_MAX_MICROVERSION=${TEMPEST_WATCHER_MAX_MICROVERSION:-"1.4"}
# Set microversion options in tempest.conf
iniset $TEMPEST_CONFIG optimize min_microversion $TEMPEST_WATCHER_MIN_MICROVERSION
iniset $TEMPEST_CONFIG optimize max_microversion $TEMPEST_WATCHER_MAX_MICROVERSION
}
# Restore xtrace # Restore xtrace
$_XTRACE_WATCHER $_XTRACE_WATCHER

View File

@@ -38,6 +38,9 @@ if is_service_enabled watcher-api watcher-decision-engine watcher-applier; then
# Start the watcher components # Start the watcher components
echo_summary "Starting watcher" echo_summary "Starting watcher"
start_watcher start_watcher
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
echo_summary "Configuring tempest for watcher"
configure_tempest_for_watcher
fi fi
if [[ "$1" == "unstack" ]]; then if [[ "$1" == "unstack" ]]; then

View File

@@ -70,7 +70,7 @@ then write_uwsgi_config "$WATCHER_UWSGI_CONF" "$WATCHER_UWSGI" "/infra-optim"
fi fi
# Migrate the database # Migrate the database
watcher-db-manage upgrade || die $LINO "DB migration error" $WATCHER_BIN_DIR/watcher-db-manage upgrade || die $LINO "DB migration error"
start_watcher start_watcher

View File

@@ -0,0 +1,47 @@
---
security:
- |
Watchers no longer forges requests on behalf of a tenant when
swapping volumes. Prior to this release watcher had 2 implementations
of moving a volume, it could use cinders volume migrate api or its own
internal implementation that directly calls nova volume attachment update
api. The former is safe and the recommend way to move volumes between
cinder storage backend the internal implementation was insecure, fragile
due to a lack of error handling and capable of deleting user data.
Insecure: the internal volume migration operation created a new keystone
user with a weak name and password and added it to the tenants project
with the admin role. It then used that user to forge request on behalf
of the tenant with admin right to swap the volume. if the applier was
restarted during the execution of this operation it would never be cleaned
up.
Fragile: the error handling was minimal, the swap volume api is async
so watcher has to poll for completion, there was no support to resume
that if interrupted of the time out was exceeded.
Data-loss: while the internal polling logic returned success or failure
watcher did not check the result, once the function returned it
unconditionally deleted the source volume. For larger volumes this
could result in irretrievable data loss.
Finally if a volume was swapped using the internal workflow it put
the nova instance in an out of sync state. If the VM was live migrated
after the swap volume completed successfully prior to a hard reboot
then the migration would fail or succeed and break tenant isolation.
see: https://bugs.launchpad.net/nova/+bug/2112187 for details.
fixes:
- |
All code related to creating keystone user and granting roles has been
removed. The internal swap volume implementation has been removed and
replaced by cinders volume migrate api. Note as part of this change
Watcher will no longer attempt volume migrations or retypes if the
instance is in the `Verify Resize` task state. This resolves several
issues related to volume migration in the zone migration and
Storage capacity balance strategies. While efforts have been made
to maintain backward compatibility these changes are required to
address a security weakness in watcher's prior approach.
see: https://bugs.launchpad.net/nova/+bug/2112187 for more context.

View File

@@ -3,15 +3,16 @@
# Andi Chandler <andi@gowling.com>, 2020. #zanata # Andi Chandler <andi@gowling.com>, 2020. #zanata
# Andi Chandler <andi@gowling.com>, 2022. #zanata # Andi Chandler <andi@gowling.com>, 2022. #zanata
# Andi Chandler <andi@gowling.com>, 2023. #zanata # Andi Chandler <andi@gowling.com>, 2023. #zanata
# Andi Chandler <andi@gowling.com>, 2024. #zanata
msgid "" msgid ""
msgstr "" msgstr ""
"Project-Id-Version: python-watcher\n" "Project-Id-Version: python-watcher\n"
"Report-Msgid-Bugs-To: \n" "Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2023-08-14 03:05+0000\n" "POT-Creation-Date: 2024-05-31 14:40+0000\n"
"MIME-Version: 1.0\n" "MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n" "Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n" "Content-Transfer-Encoding: 8bit\n"
"PO-Revision-Date: 2023-06-21 07:54+0000\n" "PO-Revision-Date: 2024-04-18 12:21+0000\n"
"Last-Translator: Andi Chandler <andi@gowling.com>\n" "Last-Translator: Andi Chandler <andi@gowling.com>\n"
"Language-Team: English (United Kingdom)\n" "Language-Team: English (United Kingdom)\n"
"Language: en_GB\n" "Language: en_GB\n"
@@ -63,6 +64,9 @@ msgstr "2.0.0"
msgid "2023.1 Series Release Notes" msgid "2023.1 Series Release Notes"
msgstr "2023.1 Series Release Notes" msgstr "2023.1 Series Release Notes"
msgid "2023.2 Series Release Notes"
msgstr "2023.2 Series Release Notes"
msgid "3.0.0" msgid "3.0.0"
msgstr "3.0.0" msgstr "3.0.0"

View File

@@ -8,7 +8,7 @@ basepython = python3
usedevelop = True usedevelop = True
allowlist_externals = find allowlist_externals = find
rm rm
install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages} install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2024.2} {opts} {packages}
setenv = setenv =
VIRTUAL_ENV={envdir} VIRTUAL_ENV={envdir}
deps = deps =

View File

@@ -17,14 +17,11 @@ import jsonschema
from oslo_log import log from oslo_log import log
from cinderclient import client as cinder_client
from watcher._i18n import _ from watcher._i18n import _
from watcher.applier.actions import base from watcher.applier.actions import base
from watcher.common import cinder_helper from watcher.common import cinder_helper
from watcher.common import exception from watcher.common import exception
from watcher.common import keystone_helper
from watcher.common import nova_helper from watcher.common import nova_helper
from watcher.common import utils
from watcher import conf from watcher import conf
CONF = conf.CONF CONF = conf.CONF
@@ -70,8 +67,6 @@ class VolumeMigrate(base.BaseAction):
def __init__(self, config, osc=None): def __init__(self, config, osc=None):
super(VolumeMigrate, self).__init__(config) super(VolumeMigrate, self).__init__(config)
self.temp_username = utils.random_string(10)
self.temp_password = utils.random_string(10)
self.cinder_util = cinder_helper.CinderHelper(osc=self.osc) self.cinder_util = cinder_helper.CinderHelper(osc=self.osc)
self.nova_util = nova_helper.NovaHelper(osc=self.osc) self.nova_util = nova_helper.NovaHelper(osc=self.osc)
@@ -134,83 +129,42 @@ class VolumeMigrate(base.BaseAction):
def _can_swap(self, volume): def _can_swap(self, volume):
"""Judge volume can be swapped""" """Judge volume can be swapped"""
# TODO(sean-k-mooney): rename this to _can_migrate and update
# tests to reflect that.
# cinder volume migration can migrate volumes that are not
# attached to instances or nova can migrate the data for cinder
# if the volume is in-use. If the volume has no attachments
# allow cinder to decided if it can be migrated.
if not volume.attachments: if not volume.attachments:
return False LOG.debug(f"volume: {volume.id} has no attachments")
instance_id = volume.attachments[0]['server_id']
instance_status = self.nova_util.find_instance(instance_id).status
if (volume.status == 'in-use' and
instance_status in ('ACTIVE', 'PAUSED', 'RESIZED')):
return True return True
return False # since it has attachments we need to validate nova's constraints
instance_id = volume.attachments[0]['server_id']
def _create_user(self, volume, user): instance_status = self.nova_util.find_instance(instance_id).status
"""Create user with volume attribute and user information""" LOG.debug(
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc) f"volume: {volume.id} is attached to instance: {instance_id} "
project_id = getattr(volume, 'os-vol-tenant-attr:tenant_id') f"in instance status: {instance_status}")
user['project'] = project_id # NOTE(sean-k-mooney): This used to allow RESIZED which
user['domain'] = keystone_util.get_project(project_id).domain_id # is the resize_verify task state, that is not an acceptable time
user['roles'] = ['admin'] # to migrate volumes, if nova does not block this in the API
return keystone_util.create_user(user) # today that is probably a bug. PAUSED is also questionable but
# it should generally be safe.
def _get_cinder_client(self, session): return (volume.status == 'in-use' and
"""Get cinder client by session""" instance_status in ('ACTIVE', 'PAUSED'))
return cinder_client.Client(
CONF.cinder_client.api_version,
session=session,
endpoint_type=CONF.cinder_client.endpoint_type)
def _swap_volume(self, volume, dest_type):
"""Swap volume to dest_type
Limitation note: only for compute libvirt driver
"""
if not dest_type:
raise exception.Invalid(
message=(_("destination type is required when "
"migration type is swap")))
if not self._can_swap(volume):
raise exception.Invalid(
message=(_("Invalid state for swapping volume")))
user_info = {
'name': self.temp_username,
'password': self.temp_password}
user = self._create_user(volume, user_info)
keystone_util = keystone_helper.KeystoneHelper(osc=self.osc)
try:
session = keystone_util.create_session(
user.id, self.temp_password)
temp_cinder = self._get_cinder_client(session)
# swap volume
new_volume = self.cinder_util.create_volume(
temp_cinder, volume, dest_type)
self.nova_util.swap_volume(volume, new_volume)
# delete old volume
self.cinder_util.delete_volume(volume)
finally:
keystone_util.delete_user(user)
return True
def _migrate(self, volume_id, dest_node, dest_type): def _migrate(self, volume_id, dest_node, dest_type):
try: try:
volume = self.cinder_util.get_volume(volume_id) volume = self.cinder_util.get_volume(volume_id)
if self.migration_type == self.SWAP: # for backward compatibility map swap to migrate.
if dest_node: if self.migration_type in (self.SWAP, self.MIGRATE):
LOG.warning("dest_node is ignored") if not self._can_swap(volume):
return self._swap_volume(volume, dest_type) raise exception.Invalid(
message=(_("Invalid state for swapping volume")))
return self.cinder_util.migrate(volume, dest_node)
elif self.migration_type == self.RETYPE: elif self.migration_type == self.RETYPE:
return self.cinder_util.retype(volume, dest_type) return self.cinder_util.retype(volume, dest_type)
elif self.migration_type == self.MIGRATE:
return self.cinder_util.migrate(volume, dest_node)
else: else:
raise exception.Invalid( raise exception.Invalid(
message=(_("Migration of type '%(migration_type)s' is not " message=(_("Migration of type '%(migration_type)s' is not "

View File

@@ -11,12 +11,15 @@
# under the License. # under the License.
from oslo_context import context from oslo_context import context
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log from oslo_log import log
from oslo_utils import timeutils from oslo_utils import timeutils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@enginefacade.transaction_context_provider
class RequestContext(context.RequestContext): class RequestContext(context.RequestContext):
"""Extends security contexts from the OpenStack common library.""" """Extends security contexts from the OpenStack common library."""

View File

@@ -15,8 +15,6 @@
from oslo_log import log from oslo_log import log
from keystoneauth1.exceptions import http as ks_exceptions from keystoneauth1.exceptions import http as ks_exceptions
from keystoneauth1 import loading
from keystoneauth1 import session
from watcher._i18n import _ from watcher._i18n import _
from watcher.common import clients from watcher.common import clients
from watcher.common import exception from watcher.common import exception
@@ -90,35 +88,3 @@ class KeystoneHelper(object):
message=(_("Domain name seems ambiguous: %s") % message=(_("Domain name seems ambiguous: %s") %
name_or_id)) name_or_id))
return domains[0] return domains[0]
def create_session(self, user_id, password):
user = self.get_user(user_id)
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
auth_url=CONF.watcher_clients_auth.auth_url,
password=password,
user_id=user_id,
project_id=user.default_project_id)
return session.Session(auth=auth)
def create_user(self, user):
project = self.get_project(user['project'])
domain = self.get_domain(user['domain'])
_user = self.keystone.users.create(
user['name'],
password=user['password'],
domain=domain,
project=project,
)
for role in user['roles']:
role = self.get_role(role)
self.keystone.roles.grant(
role.id, user=_user.id, project=project.id)
return _user
def delete_user(self, user):
try:
user = self.get_user(user)
self.keystone.users.delete(user)
except exception.Invalid:
pass

View File

@@ -19,9 +19,7 @@
import asyncio import asyncio
import datetime import datetime
import inspect import inspect
import random
import re import re
import string
from croniter import croniter from croniter import croniter
import eventlet import eventlet
@@ -160,14 +158,10 @@ def extend_with_strict_schema(validator_class):
StrictDefaultValidatingDraft4Validator = extend_with_default( StrictDefaultValidatingDraft4Validator = extend_with_default(
extend_with_strict_schema(validators.Draft4Validator)) extend_with_strict_schema(validators.Draft4Validator))
Draft4Validator = validators.Draft4Validator Draft4Validator = validators.Draft4Validator
def random_string(n):
return ''.join([random.choice(
string.ascii_letters + string.digits) for i in range(n)])
# Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet. # Some clients (e.g. MAAS) use asyncio, which isn't compatible with Eventlet.
# As a workaround, we're delegating such calls to a native thread. # As a workaround, we're delegating such calls to a native thread.
def async_compat_call(f, *args, **kwargs): def async_compat_call(f, *args, **kwargs):

View File

@@ -13,8 +13,8 @@
from logging import config as log_config from logging import config as log_config
from alembic import context from alembic import context
from oslo_db.sqlalchemy import enginefacade
from watcher.db.sqlalchemy import api as sqla_api
from watcher.db.sqlalchemy import models from watcher.db.sqlalchemy import models
# this is the Alembic Config object, which provides # this is the Alembic Config object, which provides
@@ -43,7 +43,7 @@ def run_migrations_online():
and associate a connection with the context. and associate a connection with the context.
""" """
engine = sqla_api.get_engine() engine = enginefacade.writer.get_engine()
with engine.connect() as connection: with engine.connect() as connection:
context.configure(connection=connection, context.configure(connection=connection,
target_metadata=target_metadata) target_metadata=target_metadata)

View File

@@ -28,7 +28,7 @@ def upgrade():
op.create_table( op.create_table(
'apscheduler_jobs', 'apscheduler_jobs',
sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False), sa.Column('id', sa.Unicode(191),
nullable=False), nullable=False),
sa.Column('next_run_time', sa.Float(25), index=True), sa.Column('next_run_time', sa.Float(25), index=True),
sa.Column('job_state', sa.LargeBinary, nullable=False), sa.Column('job_state', sa.LargeBinary, nullable=False),

View File

@@ -19,10 +19,12 @@
import collections import collections
import datetime import datetime
import operator import operator
import threading
from oslo_config import cfg from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import utils as db_utils from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import timeutils from oslo_utils import timeutils
from sqlalchemy.inspection import inspect from sqlalchemy.inspection import inspect
@@ -38,28 +40,7 @@ from watcher import objects
CONF = cfg.CONF CONF = cfg.CONF
_FACADE = None _CONTEXT = threading.local()
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
# FIXME(amoralej): Remove autocommit=True (and ideally use of
# LegacyEngineFacade) asap since it's not compatible with SQLAlchemy
# 2.0.
_FACADE = db_session.EngineFacade.from_config(CONF,
autocommit=True)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def get_backend(): def get_backend():
@@ -67,14 +48,15 @@ def get_backend():
return Connection() return Connection()
def model_query(model, *args, **kwargs): def _session_for_read():
"""Query helper for simpler session usage. return enginefacade.reader.using(_CONTEXT)
:param session: if present, the session to use
""" # NOTE(tylerchristie) Please add @oslo_db_api.retry_on_deadlock decorator to
session = kwargs.get('session') or get_session() # any new methods using _session_for_write (as deadlocks happen on write), so
query = session.query(model, *args) # that oslo_db is able to retry in case of deadlocks.
return query def _session_for_write():
return enginefacade.writer.using(_CONTEXT)
def add_identity_filter(query, value): def add_identity_filter(query, value):
@@ -97,8 +79,6 @@ def add_identity_filter(query, value):
def _paginate_query(model, limit=None, marker=None, sort_key=None, def _paginate_query(model, limit=None, marker=None, sort_key=None,
sort_dir=None, query=None): sort_dir=None, query=None):
if not query:
query = model_query(model)
sort_keys = ['id'] sort_keys = ['id']
if sort_key and sort_key not in sort_keys: if sort_key and sort_key not in sort_keys:
sort_keys.insert(0, sort_key) sort_keys.insert(0, sort_key)
@@ -248,38 +228,43 @@ class Connection(api.BaseConnection):
for relationship in relationships: for relationship in relationships:
if not relationship.uselist: if not relationship.uselist:
# We have a One-to-X relationship # We have a One-to-X relationship
query = query.options(joinedload(relationship.key)) query = query.options(joinedload(
getattr(model, relationship.key)))
return query return query
@oslo_db_api.retry_on_deadlock
def _create(self, model, values): def _create(self, model, values):
obj = model() with _session_for_write() as session:
cleaned_values = {k: v for k, v in values.items() obj = model()
if k not in self._get_relationships(model)} cleaned_values = {k: v for k, v in values.items()
obj.update(cleaned_values) if k not in self._get_relationships(model)}
obj.save() obj.update(cleaned_values)
return obj session.add(obj)
session.flush()
return obj
def _get(self, context, model, fieldname, value, eager): def _get(self, context, model, fieldname, value, eager):
query = model_query(model) with _session_for_read() as session:
if eager: query = session.query(model)
query = self._set_eager_options(model, query) if eager:
query = self._set_eager_options(model, query)
query = query.filter(getattr(model, fieldname) == value) query = query.filter(getattr(model, fieldname) == value)
if not context.show_deleted: if not context.show_deleted:
query = query.filter(model.deleted_at.is_(None)) query = query.filter(model.deleted_at.is_(None))
try: try:
obj = query.one() obj = query.one()
except exc.NoResultFound: except exc.NoResultFound:
raise exception.ResourceNotFound(name=model.__name__, id=value) raise exception.ResourceNotFound(name=model.__name__, id=value)
return obj return obj
@staticmethod @staticmethod
@oslo_db_api.retry_on_deadlock
def _update(model, id_, values): def _update(model, id_, values):
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(model)
query = model_query(model, session=session)
query = add_identity_filter(query, id_) query = add_identity_filter(query, id_)
try: try:
ref = query.with_for_update().one() ref = query.with_for_update().one()
@@ -287,13 +272,14 @@ class Connection(api.BaseConnection):
raise exception.ResourceNotFound(name=model.__name__, id=id_) raise exception.ResourceNotFound(name=model.__name__, id=id_)
ref.update(values) ref.update(values)
return ref
return ref
@staticmethod @staticmethod
@oslo_db_api.retry_on_deadlock
def _soft_delete(model, id_): def _soft_delete(model, id_):
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(model)
query = model_query(model, session=session)
query = add_identity_filter(query, id_) query = add_identity_filter(query, id_)
try: try:
row = query.one() row = query.one()
@@ -305,10 +291,10 @@ class Connection(api.BaseConnection):
return row return row
@staticmethod @staticmethod
@oslo_db_api.retry_on_deadlock
def _destroy(model, id_): def _destroy(model, id_):
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(model)
query = model_query(model, session=session)
query = add_identity_filter(query, id_) query = add_identity_filter(query, id_)
try: try:
@@ -321,14 +307,15 @@ class Connection(api.BaseConnection):
def _get_model_list(self, model, add_filters_func, context, filters=None, def _get_model_list(self, model, add_filters_func, context, filters=None,
limit=None, marker=None, sort_key=None, sort_dir=None, limit=None, marker=None, sort_key=None, sort_dir=None,
eager=False): eager=False):
query = model_query(model) with _session_for_read() as session:
if eager: query = session.query(model)
query = self._set_eager_options(model, query) if eager:
query = add_filters_func(query, filters) query = self._set_eager_options(model, query)
if not context.show_deleted: query = add_filters_func(query, filters)
query = query.filter(model.deleted_at.is_(None)) if not context.show_deleted:
return _paginate_query(model, limit, marker, query = query.filter(model.deleted_at.is_(None))
sort_key, sort_dir, query) return _paginate_query(model, limit, marker,
sort_key, sort_dir, query)
# NOTE(erakli): _add_..._filters methods should be refactored to have same # NOTE(erakli): _add_..._filters methods should be refactored to have same
# content. join_fieldmap should be filled with JoinMap instead of dict # content. join_fieldmap should be filled with JoinMap instead of dict
@@ -423,11 +410,12 @@ class Connection(api.BaseConnection):
plain_fields=plain_fields, join_fieldmap=join_fieldmap) plain_fields=plain_fields, join_fieldmap=join_fieldmap)
if 'audit_uuid' in filters: if 'audit_uuid' in filters:
stmt = model_query(models.ActionPlan).join( with _session_for_read() as session:
models.Audit, stmt = session.query(models.ActionPlan).join(
models.Audit.id == models.ActionPlan.audit_id)\ models.Audit,
.filter_by(uuid=filters['audit_uuid']).subquery() models.Audit.id == models.ActionPlan.audit_id)\
query = query.filter_by(action_plan_id=stmt.c.id) .filter_by(uuid=filters['audit_uuid']).subquery()
query = query.filter_by(action_plan_id=stmt.c.id)
return query return query
@@ -605,20 +593,21 @@ class Connection(api.BaseConnection):
if not values.get('uuid'): if not values.get('uuid'):
values['uuid'] = utils.generate_uuid() values['uuid'] = utils.generate_uuid()
query = model_query(models.AuditTemplate) with _session_for_write() as session:
query = query.filter_by(name=values.get('name'), query = session.query(models.AuditTemplate)
deleted_at=None) query = query.filter_by(name=values.get('name'),
deleted_at=None)
if len(query.all()) > 0: if len(query.all()) > 0:
raise exception.AuditTemplateAlreadyExists( raise exception.AuditTemplateAlreadyExists(
audit_template=values['name']) audit_template=values['name'])
try: try:
audit_template = self._create(models.AuditTemplate, values) audit_template = self._create(models.AuditTemplate, values)
except db_exc.DBDuplicateEntry: except db_exc.DBDuplicateEntry:
raise exception.AuditTemplateAlreadyExists( raise exception.AuditTemplateAlreadyExists(
audit_template=values['name']) audit_template=values['name'])
return audit_template return audit_template
def _get_audit_template(self, context, fieldname, value, eager): def _get_audit_template(self, context, fieldname, value, eager):
try: try:
@@ -680,25 +669,26 @@ class Connection(api.BaseConnection):
if not values.get('uuid'): if not values.get('uuid'):
values['uuid'] = utils.generate_uuid() values['uuid'] = utils.generate_uuid()
query = model_query(models.Audit) with _session_for_write() as session:
query = query.filter_by(name=values.get('name'), query = session.query(models.Audit)
deleted_at=None) query = query.filter_by(name=values.get('name'),
deleted_at=None)
if len(query.all()) > 0: if len(query.all()) > 0:
raise exception.AuditAlreadyExists( raise exception.AuditAlreadyExists(
audit=values['name']) audit=values['name'])
if values.get('state') is None: if values.get('state') is None:
values['state'] = objects.audit.State.PENDING values['state'] = objects.audit.State.PENDING
if not values.get('auto_trigger'): if not values.get('auto_trigger'):
values['auto_trigger'] = False values['auto_trigger'] = False
try: try:
audit = self._create(models.Audit, values) audit = self._create(models.Audit, values)
except db_exc.DBDuplicateEntry: except db_exc.DBDuplicateEntry:
raise exception.AuditAlreadyExists(audit=values['uuid']) raise exception.AuditAlreadyExists(audit=values['uuid'])
return audit return audit
def _get_audit(self, context, fieldname, value, eager): def _get_audit(self, context, fieldname, value, eager):
try: try:
@@ -722,14 +712,13 @@ class Connection(api.BaseConnection):
def destroy_audit(self, audit_id): def destroy_audit(self, audit_id):
def is_audit_referenced(session, audit_id): def is_audit_referenced(session, audit_id):
"""Checks whether the audit is referenced by action_plan(s).""" """Checks whether the audit is referenced by action_plan(s)."""
query = model_query(models.ActionPlan, session=session) query = session.query(models.ActionPlan)
query = self._add_action_plans_filters( query = self._add_action_plans_filters(
query, {'audit_id': audit_id}) query, {'audit_id': audit_id})
return query.count() != 0 return query.count() != 0
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(models.Audit)
query = model_query(models.Audit, session=session)
query = add_identity_filter(query, audit_id) query = add_identity_filter(query, audit_id)
try: try:
@@ -796,9 +785,8 @@ class Connection(api.BaseConnection):
context, fieldname="uuid", value=action_uuid, eager=eager) context, fieldname="uuid", value=action_uuid, eager=eager)
def destroy_action(self, action_id): def destroy_action(self, action_id):
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(models.Action)
query = model_query(models.Action, session=session)
query = add_identity_filter(query, action_id) query = add_identity_filter(query, action_id)
count = query.delete() count = query.delete()
if count != 1: if count != 1:
@@ -814,9 +802,8 @@ class Connection(api.BaseConnection):
@staticmethod @staticmethod
def _do_update_action(action_id, values): def _do_update_action(action_id, values):
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(models.Action)
query = model_query(models.Action, session=session)
query = add_identity_filter(query, action_id) query = add_identity_filter(query, action_id)
try: try:
ref = query.with_for_update().one() ref = query.with_for_update().one()
@@ -824,7 +811,7 @@ class Connection(api.BaseConnection):
raise exception.ActionNotFound(action=action_id) raise exception.ActionNotFound(action=action_id)
ref.update(values) ref.update(values)
return ref return ref
def soft_delete_action(self, action_id): def soft_delete_action(self, action_id):
try: try:
@@ -868,14 +855,13 @@ class Connection(api.BaseConnection):
def destroy_action_plan(self, action_plan_id): def destroy_action_plan(self, action_plan_id):
def is_action_plan_referenced(session, action_plan_id): def is_action_plan_referenced(session, action_plan_id):
"""Checks whether the action_plan is referenced by action(s).""" """Checks whether the action_plan is referenced by action(s)."""
query = model_query(models.Action, session=session) query = session.query(models.Action)
query = self._add_actions_filters( query = self._add_actions_filters(
query, {'action_plan_id': action_plan_id}) query, {'action_plan_id': action_plan_id})
return query.count() != 0 return query.count() != 0
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(models.ActionPlan)
query = model_query(models.ActionPlan, session=session)
query = add_identity_filter(query, action_plan_id) query = add_identity_filter(query, action_plan_id)
try: try:
@@ -899,9 +885,8 @@ class Connection(api.BaseConnection):
@staticmethod @staticmethod
def _do_update_action_plan(action_plan_id, values): def _do_update_action_plan(action_plan_id, values):
session = get_session() with _session_for_write() as session:
with session.begin(): query = session.query(models.ActionPlan)
query = model_query(models.ActionPlan, session=session)
query = add_identity_filter(query, action_plan_id) query = add_identity_filter(query, action_plan_id)
try: try:
ref = query.with_for_update().one() ref = query.with_for_update().one()
@@ -909,7 +894,7 @@ class Connection(api.BaseConnection):
raise exception.ActionPlanNotFound(action_plan=action_plan_id) raise exception.ActionPlanNotFound(action_plan=action_plan_id)
ref.update(values) ref.update(values)
return ref return ref
def soft_delete_action_plan(self, action_plan_id): def soft_delete_action_plan(self, action_plan_id):
try: try:

View File

@@ -22,6 +22,7 @@ from apscheduler.jobstores.base import ConflictingIdError
from apscheduler.jobstores import sqlalchemy from apscheduler.jobstores import sqlalchemy
from apscheduler.util import datetime_to_utc_timestamp from apscheduler.util import datetime_to_utc_timestamp
from apscheduler.util import maybe_ref from apscheduler.util import maybe_ref
from apscheduler.util import utc_timestamp_to_datetime
from watcher.common import context from watcher.common import context
from watcher.common import service from watcher.common import service
@@ -32,7 +33,7 @@ try:
except ImportError: # pragma: nocover except ImportError: # pragma: nocover
import pickle import pickle
from sqlalchemy import Table, MetaData, select, and_ from sqlalchemy import Table, MetaData, select, and_, null
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@@ -58,8 +59,7 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
super(WatcherJobStore, self).__init__(url, engine, tablename, super(WatcherJobStore, self).__init__(url, engine, tablename,
metadata, pickle_protocol) metadata, pickle_protocol)
metadata = maybe_ref(metadata) or MetaData() metadata = maybe_ref(metadata) or MetaData()
self.jobs_t = Table(tablename, metadata, autoload=True, self.jobs_t = Table(tablename, metadata, autoload_with=engine)
autoload_with=engine)
service_ident = service.ServiceHeartbeat.get_service_name() service_ident = service.ServiceHeartbeat.get_service_name()
self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]} self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]}
self.service_id = objects.Service.list(context=context.make_context(), self.service_id = objects.Service.list(context=context.make_context(),
@@ -79,7 +79,8 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
'tag': jsonutils.dumps(self.tag) 'tag': jsonutils.dumps(self.tag)
}) })
try: try:
self.engine.execute(insert) with self.engine.begin() as conn:
conn.execute(insert)
except IntegrityError: except IntegrityError:
raise ConflictingIdError(job.id) raise ConflictingIdError(job.id)
@@ -88,20 +89,36 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
self._fix_paused_jobs_sorting(jobs) self._fix_paused_jobs_sorting(jobs)
return jobs return jobs
def get_next_run_time(self):
selectable = select(self.jobs_t.c.next_run_time).\
where(self.jobs_t.c.next_run_time != null()).\
order_by(self.jobs_t.c.next_run_time).limit(1)
with self.engine.begin() as connection:
# NOTE(danms): The apscheduler implementation of this gets a
# decimal.Decimal back from scalar() which causes
# utc_timestamp_to_datetime() to choke since it is expecting a
# python float. Assume this is SQLAlchemy 2.0 stuff, so just
# coerce to a float here.
next_run_time = connection.execute(selectable).scalar()
return utc_timestamp_to_datetime(float(next_run_time)
if next_run_time is not None
else None)
def _get_jobs(self, *conditions): def _get_jobs(self, *conditions):
jobs = [] jobs = []
conditions += (self.jobs_t.c.service_id == self.service_id,) conditions += (self.jobs_t.c.service_id == self.service_id,)
selectable = select( selectable = select(
[self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag] self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag
).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions)) ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions))
failed_job_ids = set() failed_job_ids = set()
for row in self.engine.execute(selectable): with self.engine.begin() as conn:
try: for row in conn.execute(selectable):
jobs.append(self._reconstitute_job(row.job_state)) try:
except Exception: jobs.append(self._reconstitute_job(row.job_state))
self._logger.exception( except Exception:
'Unable to restore job "%s" -- removing it', row.id) self._logger.exception(
failed_job_ids.add(row.id) 'Unable to restore job "%s" -- removing it', row.id)
failed_job_ids.add(row.id)
# Remove all the jobs we failed to restore # Remove all the jobs we failed to restore
if failed_job_ids: if failed_job_ids:

View File

@@ -20,9 +20,9 @@ import alembic
from alembic import config as alembic_config from alembic import config as alembic_config
import alembic.migration as alembic_migration import alembic.migration as alembic_migration
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from watcher._i18n import _ from watcher._i18n import _
from watcher.db.sqlalchemy import api as sqla_api
from watcher.db.sqlalchemy import models from watcher.db.sqlalchemy import models
@@ -39,7 +39,7 @@ def version(engine=None):
:rtype: string :rtype: string
""" """
if engine is None: if engine is None:
engine = sqla_api.get_engine() engine = enginefacade.reader.get_engine()
with engine.connect() as conn: with engine.connect() as conn:
context = alembic_migration.MigrationContext.configure(conn) context = alembic_migration.MigrationContext.configure(conn)
return context.get_current_revision() return context.get_current_revision()
@@ -63,7 +63,7 @@ def create_schema(config=None, engine=None):
Can be used for initial installation instead of upgrade('head'). Can be used for initial installation instead of upgrade('head').
""" """
if engine is None: if engine is None:
engine = sqla_api.get_engine() engine = enginefacade.writer.get_engine()
# NOTE(viktors): If we will use metadata.create_all() for non empty db # NOTE(viktors): If we will use metadata.create_all() for non empty db
# schema, it will only add the new tables, but leave # schema, it will only add the new tables, but leave

View File

@@ -93,14 +93,6 @@ class WatcherBase(models.SoftDeleteMixin,
d[c.name] = self[c.name] d[c.name] = self[c.name]
return d return d
def save(self, session=None):
import watcher.db.sqlalchemy.api as db_api
if session is None:
session = db_api.get_session()
super(WatcherBase, self).save(session)
Base = declarative_base(cls=WatcherBase) Base = declarative_base(cls=WatcherBase)

View File

@@ -52,7 +52,7 @@ class ContinuousAuditHandler(base.AuditHandler):
self._audit_scheduler = scheduling.BackgroundSchedulerService( self._audit_scheduler = scheduling.BackgroundSchedulerService(
jobstores={ jobstores={
'default': job_store.WatcherJobStore( 'default': job_store.WatcherJobStore(
engine=sq_api.get_engine()), engine=sq_api.enginefacade.writer.get_engine()),
} }
) )
return self._audit_scheduler return self._audit_scheduler

View File

@@ -57,8 +57,19 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
self.planned_cold_count = 0 self.planned_cold_count = 0
self.volume_count = 0 self.volume_count = 0
self.planned_volume_count = 0 self.planned_volume_count = 0
self.volume_update_count = 0
self.planned_volume_update_count = 0 # TODO(sean-n-mooney) This is backward compatibility
# for calling the swap code paths. Swap is now an alias
# for migrate, we should clean this up in a future
# cycle.
@property
def volume_update_count(self):
return self.volume_count
# same as above clean up later.
@property
def planned_volume_update_count(self):
return self.planned_volume_count
@classmethod @classmethod
def get_name(cls): def get_name(cls):
@@ -312,8 +323,8 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
planned_cold_migrate_instance_count=self.planned_cold_count, planned_cold_migrate_instance_count=self.planned_cold_count,
volume_migrate_count=self.volume_count, volume_migrate_count=self.volume_count,
planned_volume_migrate_count=self.planned_volume_count, planned_volume_migrate_count=self.planned_volume_count,
volume_update_count=self.volume_update_count, volume_update_count=self.volume_count,
planned_volume_update_count=self.planned_volume_update_count planned_volume_update_count=self.planned_volume_count
) )
def set_migration_count(self, targets): def set_migration_count(self, targets):
@@ -328,10 +339,7 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
elif self.is_cold(instance): elif self.is_cold(instance):
self.cold_count += 1 self.cold_count += 1
for volume in targets.get('volume', []): for volume in targets.get('volume', []):
if self.is_available(volume): self.volume_count += 1
self.volume_count += 1
elif self.is_in_use(volume):
self.volume_update_count += 1
def is_live(self, instance): def is_live(self, instance):
status = getattr(instance, 'status') status = getattr(instance, 'status')
@@ -404,19 +412,16 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
LOG.debug(src_type) LOG.debug(src_type)
LOG.debug("%s %s", dst_pool, dst_type) LOG.debug("%s %s", dst_pool, dst_type)
if self.is_available(volume): if src_type == dst_type:
if src_type == dst_type: self._volume_migrate(volume, dst_pool)
self._volume_migrate(volume, dst_pool) else:
else: self._volume_retype(volume, dst_type)
self._volume_retype(volume, dst_type)
elif self.is_in_use(volume):
self._volume_update(volume, dst_type)
# if with_attached_volume is True, migrate attaching instances # if with_attached_volume is True, migrate attaching instances
if self.with_attached_volume: if self.with_attached_volume:
instances = [self.nova.find_instance(dic.get('server_id')) instances = [self.nova.find_instance(dic.get('server_id'))
for dic in volume.attachments] for dic in volume.attachments]
self.instances_migration(instances, action_counter) self.instances_migration(instances, action_counter)
action_counter.add_pool(pool) action_counter.add_pool(pool)
@@ -464,16 +469,6 @@ class ZoneMigration(base.ZoneMigrationBaseStrategy):
input_parameters=parameters) input_parameters=parameters)
self.planned_cold_count += 1 self.planned_cold_count += 1
def _volume_update(self, volume, dst_type):
parameters = {"migration_type": "swap",
"destination_type": dst_type,
"resource_name": volume.name}
self.solution.add_action(
action_type="volume_migrate",
resource_id=volume.id,
input_parameters=parameters)
self.planned_volume_update_count += 1
def _volume_migrate(self, volume, dst_pool): def _volume_migrate(self, volume, dst_pool):
parameters = {"migration_type": "migrate", parameters = {"migration_type": "migrate",
"destination_node": dst_pool, "destination_node": dst_pool,

View File

@@ -22,7 +22,6 @@ from watcher.common import cinder_helper
from watcher.common import clients from watcher.common import clients
from watcher.common import keystone_helper from watcher.common import keystone_helper
from watcher.common import nova_helper from watcher.common import nova_helper
from watcher.common import utils as w_utils
from watcher.tests import base from watcher.tests import base
@@ -102,12 +101,15 @@ class TestMigration(base.TestCase):
@staticmethod @staticmethod
def fake_volume(**kwargs): def fake_volume(**kwargs):
# FIXME(sean-k-mooney): we should be using real objects in this
# test or at lease something more Representative of the real data
volume = mock.MagicMock() volume = mock.MagicMock()
volume.id = kwargs.get('id', TestMigration.VOLUME_UUID) volume.id = kwargs.get('id', TestMigration.VOLUME_UUID)
volume.size = kwargs.get('size', '1') volume.size = kwargs.get('size', '1')
volume.status = kwargs.get('status', 'available') volume.status = kwargs.get('status', 'available')
volume.snapshot_id = kwargs.get('snapshot_id', None) volume.snapshot_id = kwargs.get('snapshot_id', None)
volume.availability_zone = kwargs.get('availability_zone', 'nova') volume.availability_zone = kwargs.get('availability_zone', 'nova')
volume.attachments = kwargs.get('attachments', [])
return volume return volume
@staticmethod @staticmethod
@@ -175,42 +177,14 @@ class TestMigration(base.TestCase):
"storage1-typename", "storage1-typename",
) )
def test_swap_success(self):
volume = self.fake_volume(
status='in-use', attachments=[{'server_id': 'server_id'}])
self.m_n_helper.find_instance.return_value = self.fake_instance()
new_volume = self.fake_volume(id=w_utils.generate_uuid())
user = mock.Mock()
session = mock.MagicMock()
self.m_k_helper.create_user.return_value = user
self.m_k_helper.create_session.return_value = session
self.m_c_helper.get_volume.return_value = volume
self.m_c_helper.create_volume.return_value = new_volume
result = self.action_swap.execute()
self.assertTrue(result)
self.m_n_helper.swap_volume.assert_called_once_with(
volume,
new_volume
)
self.m_k_helper.delete_user.assert_called_once_with(user)
def test_swap_fail(self):
# _can_swap fail
instance = self.fake_instance(status='STOPPED')
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap.execute()
self.assertFalse(result)
def test_can_swap_success(self): def test_can_swap_success(self):
volume = self.fake_volume( volume = self.fake_volume(
status='in-use', attachments=[{'server_id': 'server_id'}]) status='in-use', attachments=[
instance = self.fake_instance() {'server_id': TestMigration.INSTANCE_UUID}])
instance = self.fake_instance()
self.m_n_helper.find_instance.return_value = instance self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume) result = self.action_swap._can_swap(volume)
self.assertTrue(result) self.assertTrue(result)
@@ -219,16 +193,33 @@ class TestMigration(base.TestCase):
result = self.action_swap._can_swap(volume) result = self.action_swap._can_swap(volume)
self.assertTrue(result) self.assertTrue(result)
instance = self.fake_instance(status='RESIZED')
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume)
self.assertTrue(result)
def test_can_swap_fail(self): def test_can_swap_fail(self):
volume = self.fake_volume( volume = self.fake_volume(
status='in-use', attachments=[{'server_id': 'server_id'}]) status='in-use', attachments=[
{'server_id': TestMigration.INSTANCE_UUID}])
instance = self.fake_instance(status='STOPPED') instance = self.fake_instance(status='STOPPED')
self.m_n_helper.find_instance.return_value = instance self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume) result = self.action_swap._can_swap(volume)
self.assertFalse(result) self.assertFalse(result)
instance = self.fake_instance(status='RESIZED')
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap._can_swap(volume)
self.assertFalse(result)
def test_swap_success(self):
volume = self.fake_volume(
status='in-use', attachments=[
{'server_id': TestMigration.INSTANCE_UUID}])
self.m_c_helper.get_volume.return_value = volume
instance = self.fake_instance()
self.m_n_helper.find_instance.return_value = instance
result = self.action_swap.execute()
self.assertTrue(result)
self.m_c_helper.migrate.assert_called_once_with(
volume,
"storage1-poolname"
)

View File

@@ -17,9 +17,10 @@
import fixtures import fixtures
from oslo_config import cfg from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from watcher.db import api as dbapi from watcher.db import api as dbapi
from watcher.db.sqlalchemy import api as sqla_api
from watcher.db.sqlalchemy import migration from watcher.db.sqlalchemy import migration
from watcher.db.sqlalchemy import models from watcher.db.sqlalchemy import models
from watcher.tests import base from watcher.tests import base
@@ -35,16 +36,16 @@ _DB_CACHE = None
class Database(fixtures.Fixture): class Database(fixtures.Fixture):
def __init__(self, db_api, db_migrate, sql_connection): def __init__(self, engine, db_migrate, sql_connection):
self.sql_connection = sql_connection self.sql_connection = sql_connection
self.engine = db_api.get_engine() self.engine = engine
self.engine.dispose() self.engine.dispose()
conn = self.engine.connect()
self.setup_sqlite(db_migrate)
self.post_migrations()
self._DB = "".join(line for line in conn.connection.iterdump()) with self.engine.connect() as conn:
self.setup_sqlite(db_migrate)
self.post_migrations()
self._DB = "".join(line for line in conn.connection.iterdump())
self.engine.dispose() self.engine.dispose()
def setup_sqlite(self, db_migrate): def setup_sqlite(self, db_migrate):
@@ -55,9 +56,8 @@ class Database(fixtures.Fixture):
def setUp(self): def setUp(self):
super(Database, self).setUp() super(Database, self).setUp()
with self.engine.connect() as conn:
conn = self.engine.connect() conn.connection.executescript(self._DB)
conn.connection.executescript(self._DB)
self.addCleanup(self.engine.dispose) self.addCleanup(self.engine.dispose)
def post_migrations(self): def post_migrations(self):
@@ -80,7 +80,9 @@ class DbTestCase(base.TestCase):
global _DB_CACHE global _DB_CACHE
if not _DB_CACHE: if not _DB_CACHE:
_DB_CACHE = Database(sqla_api, migration, engine = enginefacade.writer.get_engine()
_DB_CACHE = Database(engine, migration,
sql_connection=CONF.database.connection) sql_connection=CONF.database.connection)
engine.dispose()
self.useFixture(_DB_CACHE) self.useFixture(_DB_CACHE)
self._id_gen = utils.id_generator() self._id_gen = utils.id_generator()

View File

@@ -263,7 +263,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
cfg.CONF.set_override("host", "hostname1") cfg.CONF.set_override("host", "hostname1")
@mock.patch.object(objects.service.Service, 'list') @mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine') @mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list') @mock.patch.object(objects.audit.Audit, 'list')
@@ -286,7 +286,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
self.assertIsNone(self.audits[1].next_run_time) self.assertIsNone(self.audits[1].next_run_time)
@mock.patch.object(objects.service.Service, 'list') @mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine') @mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list') @mock.patch.object(objects.audit.Audit, 'list')
@@ -309,7 +309,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time') @mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
@mock.patch.object(objects.service.Service, 'list') @mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine') @mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list') @mock.patch.object(objects.audit.Audit, 'list')
@@ -328,7 +328,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
audit_handler.launch_audits_periodically) audit_handler.launch_audits_periodically)
@mock.patch.object(objects.service.Service, 'list') @mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine') @mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list') @mock.patch.object(objects.audit.Audit, 'list')
@@ -349,7 +349,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
m_add_job.assert_has_calls(calls) m_add_job.assert_has_calls(calls)
@mock.patch.object(objects.service.Service, 'list') @mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine') @mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.Audit, 'list') @mock.patch.object(objects.audit.Audit, 'list')
@@ -384,7 +384,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
self.assertTrue(is_inactive) self.assertTrue(is_inactive)
@mock.patch.object(objects.service.Service, 'list') @mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine') @mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
@mock.patch.object(objects.audit.AuditStateTransitionManager, @mock.patch.object(objects.audit.AuditStateTransitionManager,
'is_inactive') 'is_inactive')
@@ -406,7 +406,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
self.assertIsNotNone(self.audits[0].next_run_time) self.assertIsNotNone(self.audits[0].next_run_time)
@mock.patch.object(objects.service.Service, 'list') @mock.patch.object(objects.service.Service, 'list')
@mock.patch.object(sq_api, 'get_engine') @mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job') @mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')

View File

@@ -320,7 +320,10 @@ class TestZoneMigration(TestBaseStrategy):
migration_types = collections.Counter( migration_types = collections.Counter(
[action.get('input_parameters')['migration_type'] [action.get('input_parameters')['migration_type']
for action in solution.actions]) for action in solution.actions])
self.assertEqual(1, migration_types.get("swap", 0)) # watcher no longer implements swap. it is now an
# alias for migrate.
self.assertEqual(0, migration_types.get("swap", 0))
self.assertEqual(1, migration_types.get("migrate", 1))
global_efficacy_value = solution.global_efficacy[3].get('value', 0) global_efficacy_value = solution.global_efficacy[3].get('value', 0)
self.assertEqual(100, global_efficacy_value) self.assertEqual(100, global_efficacy_value)