Further database refactoring
More refactoring of the SQLAlchemy database layer to improve compatility with eventlet on newer Pythons. Inspired by0ce2c41404Related-Bug: 2067815 Change-Id: Ib5e9aa288232cc1b766bbf2a8ce2113d5a8e2f7d (cherry picked from commit753c44b0c4)
This commit is contained in:
committed by
Alfredo Moralejo
parent
8b0f1dbf66
commit
54b3b58428
@@ -11,12 +11,15 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_context import context
|
from oslo_context import context
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@enginefacade.transaction_context_provider
|
||||||
class RequestContext(context.RequestContext):
|
class RequestContext(context.RequestContext):
|
||||||
"""Extends security contexts from the OpenStack common library."""
|
"""Extends security contexts from the OpenStack common library."""
|
||||||
|
|
||||||
|
|||||||
@@ -13,8 +13,8 @@
|
|||||||
from logging import config as log_config
|
from logging import config as log_config
|
||||||
|
|
||||||
from alembic import context
|
from alembic import context
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
|
|
||||||
from watcher.db.sqlalchemy import api as sqla_api
|
|
||||||
from watcher.db.sqlalchemy import models
|
from watcher.db.sqlalchemy import models
|
||||||
|
|
||||||
# this is the Alembic Config object, which provides
|
# this is the Alembic Config object, which provides
|
||||||
@@ -43,7 +43,7 @@ def run_migrations_online():
|
|||||||
and associate a connection with the context.
|
and associate a connection with the context.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
engine = sqla_api.get_engine()
|
engine = enginefacade.writer.get_engine()
|
||||||
with engine.connect() as connection:
|
with engine.connect() as connection:
|
||||||
context.configure(connection=connection,
|
context.configure(connection=connection,
|
||||||
target_metadata=target_metadata)
|
target_metadata=target_metadata)
|
||||||
|
|||||||
@@ -19,8 +19,10 @@
|
|||||||
import collections
|
import collections
|
||||||
import datetime
|
import datetime
|
||||||
import operator
|
import operator
|
||||||
|
import threading
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_db import api as oslo_db_api
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
from oslo_db.sqlalchemy import enginefacade
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
from oslo_db.sqlalchemy import utils as db_utils
|
from oslo_db.sqlalchemy import utils as db_utils
|
||||||
@@ -38,26 +40,7 @@ from watcher import objects
|
|||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
_FACADE = None
|
_CONTEXT = threading.local()
|
||||||
|
|
||||||
|
|
||||||
def _create_facade_lazily():
|
|
||||||
global _FACADE
|
|
||||||
if _FACADE is None:
|
|
||||||
ctx = enginefacade.transaction_context()
|
|
||||||
_FACADE = ctx.writer
|
|
||||||
return _FACADE
|
|
||||||
|
|
||||||
|
|
||||||
def get_engine():
|
|
||||||
facade = _create_facade_lazily()
|
|
||||||
return facade.get_engine()
|
|
||||||
|
|
||||||
|
|
||||||
def get_session(**kwargs):
|
|
||||||
facade = _create_facade_lazily()
|
|
||||||
sessionmaker = facade.get_sessionmaker()
|
|
||||||
return sessionmaker(**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def get_backend():
|
def get_backend():
|
||||||
@@ -65,14 +48,15 @@ def get_backend():
|
|||||||
return Connection()
|
return Connection()
|
||||||
|
|
||||||
|
|
||||||
def model_query(model, *args, **kwargs):
|
def _session_for_read():
|
||||||
"""Query helper for simpler session usage.
|
return enginefacade.reader.using(_CONTEXT)
|
||||||
|
|
||||||
:param session: if present, the session to use
|
|
||||||
"""
|
# NOTE(tylerchristie) Please add @oslo_db_api.retry_on_deadlock decorator to
|
||||||
session = kwargs.get('session') or get_session()
|
# any new methods using _session_for_write (as deadlocks happen on write), so
|
||||||
query = session.query(model, *args)
|
# that oslo_db is able to retry in case of deadlocks.
|
||||||
return query
|
def _session_for_write():
|
||||||
|
return enginefacade.writer.using(_CONTEXT)
|
||||||
|
|
||||||
|
|
||||||
def add_identity_filter(query, value):
|
def add_identity_filter(query, value):
|
||||||
@@ -95,8 +79,6 @@ def add_identity_filter(query, value):
|
|||||||
|
|
||||||
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||||
sort_dir=None, query=None):
|
sort_dir=None, query=None):
|
||||||
if not query:
|
|
||||||
query = model_query(model)
|
|
||||||
sort_keys = ['id']
|
sort_keys = ['id']
|
||||||
if sort_key and sort_key not in sort_keys:
|
if sort_key and sort_key not in sort_keys:
|
||||||
sort_keys.insert(0, sort_key)
|
sort_keys.insert(0, sort_key)
|
||||||
@@ -250,21 +232,20 @@ class Connection(api.BaseConnection):
|
|||||||
getattr(model, relationship.key)))
|
getattr(model, relationship.key)))
|
||||||
return query
|
return query
|
||||||
|
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _create(self, model, values):
|
def _create(self, model, values):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
|
||||||
obj = model()
|
obj = model()
|
||||||
cleaned_values = {k: v for k, v in values.items()
|
cleaned_values = {k: v for k, v in values.items()
|
||||||
if k not in self._get_relationships(model)}
|
if k not in self._get_relationships(model)}
|
||||||
obj.update(cleaned_values)
|
obj.update(cleaned_values)
|
||||||
obj.save(session=session)
|
session.add(obj)
|
||||||
session.commit()
|
session.flush()
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
def _get(self, context, model, fieldname, value, eager):
|
def _get(self, context, model, fieldname, value, eager):
|
||||||
session = get_session()
|
with _session_for_read() as session:
|
||||||
with session.begin():
|
query = session.query(model)
|
||||||
query = model_query(model, session=session)
|
|
||||||
if eager:
|
if eager:
|
||||||
query = self._set_eager_options(model, query)
|
query = self._set_eager_options(model, query)
|
||||||
|
|
||||||
@@ -277,13 +258,13 @@ class Connection(api.BaseConnection):
|
|||||||
except exc.NoResultFound:
|
except exc.NoResultFound:
|
||||||
raise exception.ResourceNotFound(name=model.__name__, id=value)
|
raise exception.ResourceNotFound(name=model.__name__, id=value)
|
||||||
|
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _update(model, id_, values):
|
def _update(model, id_, values):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(model)
|
||||||
query = model_query(model, session=session)
|
|
||||||
query = add_identity_filter(query, id_)
|
query = add_identity_filter(query, id_)
|
||||||
try:
|
try:
|
||||||
ref = query.with_for_update().one()
|
ref = query.with_for_update().one()
|
||||||
@@ -291,13 +272,14 @@ class Connection(api.BaseConnection):
|
|||||||
raise exception.ResourceNotFound(name=model.__name__, id=id_)
|
raise exception.ResourceNotFound(name=model.__name__, id=id_)
|
||||||
|
|
||||||
ref.update(values)
|
ref.update(values)
|
||||||
return ref
|
|
||||||
|
return ref
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _soft_delete(model, id_):
|
def _soft_delete(model, id_):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(model)
|
||||||
query = model_query(model, session=session)
|
|
||||||
query = add_identity_filter(query, id_)
|
query = add_identity_filter(query, id_)
|
||||||
try:
|
try:
|
||||||
row = query.one()
|
row = query.one()
|
||||||
@@ -309,10 +291,10 @@ class Connection(api.BaseConnection):
|
|||||||
return row
|
return row
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@oslo_db_api.retry_on_deadlock
|
||||||
def _destroy(model, id_):
|
def _destroy(model, id_):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(model)
|
||||||
query = model_query(model, session=session)
|
|
||||||
query = add_identity_filter(query, id_)
|
query = add_identity_filter(query, id_)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -325,14 +307,15 @@ class Connection(api.BaseConnection):
|
|||||||
def _get_model_list(self, model, add_filters_func, context, filters=None,
|
def _get_model_list(self, model, add_filters_func, context, filters=None,
|
||||||
limit=None, marker=None, sort_key=None, sort_dir=None,
|
limit=None, marker=None, sort_key=None, sort_dir=None,
|
||||||
eager=False):
|
eager=False):
|
||||||
query = model_query(model)
|
with _session_for_read() as session:
|
||||||
if eager:
|
query = session.query(model)
|
||||||
query = self._set_eager_options(model, query)
|
if eager:
|
||||||
query = add_filters_func(query, filters)
|
query = self._set_eager_options(model, query)
|
||||||
if not context.show_deleted:
|
query = add_filters_func(query, filters)
|
||||||
query = query.filter(model.deleted_at.is_(None))
|
if not context.show_deleted:
|
||||||
return _paginate_query(model, limit, marker,
|
query = query.filter(model.deleted_at.is_(None))
|
||||||
sort_key, sort_dir, query)
|
return _paginate_query(model, limit, marker,
|
||||||
|
sort_key, sort_dir, query)
|
||||||
|
|
||||||
# NOTE(erakli): _add_..._filters methods should be refactored to have same
|
# NOTE(erakli): _add_..._filters methods should be refactored to have same
|
||||||
# content. join_fieldmap should be filled with JoinMap instead of dict
|
# content. join_fieldmap should be filled with JoinMap instead of dict
|
||||||
@@ -427,11 +410,12 @@ class Connection(api.BaseConnection):
|
|||||||
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
|
plain_fields=plain_fields, join_fieldmap=join_fieldmap)
|
||||||
|
|
||||||
if 'audit_uuid' in filters:
|
if 'audit_uuid' in filters:
|
||||||
stmt = model_query(models.ActionPlan).join(
|
with _session_for_read() as session:
|
||||||
models.Audit,
|
stmt = session.query(models.ActionPlan).join(
|
||||||
models.Audit.id == models.ActionPlan.audit_id)\
|
models.Audit,
|
||||||
.filter_by(uuid=filters['audit_uuid']).subquery()
|
models.Audit.id == models.ActionPlan.audit_id)\
|
||||||
query = query.filter_by(action_plan_id=stmt.c.id)
|
.filter_by(uuid=filters['audit_uuid']).subquery()
|
||||||
|
query = query.filter_by(action_plan_id=stmt.c.id)
|
||||||
|
|
||||||
return query
|
return query
|
||||||
|
|
||||||
@@ -609,20 +593,21 @@ class Connection(api.BaseConnection):
|
|||||||
if not values.get('uuid'):
|
if not values.get('uuid'):
|
||||||
values['uuid'] = utils.generate_uuid()
|
values['uuid'] = utils.generate_uuid()
|
||||||
|
|
||||||
query = model_query(models.AuditTemplate)
|
with _session_for_write() as session:
|
||||||
query = query.filter_by(name=values.get('name'),
|
query = session.query(models.AuditTemplate)
|
||||||
deleted_at=None)
|
query = query.filter_by(name=values.get('name'),
|
||||||
|
deleted_at=None)
|
||||||
|
|
||||||
if len(query.all()) > 0:
|
if len(query.all()) > 0:
|
||||||
raise exception.AuditTemplateAlreadyExists(
|
raise exception.AuditTemplateAlreadyExists(
|
||||||
audit_template=values['name'])
|
audit_template=values['name'])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
audit_template = self._create(models.AuditTemplate, values)
|
audit_template = self._create(models.AuditTemplate, values)
|
||||||
except db_exc.DBDuplicateEntry:
|
except db_exc.DBDuplicateEntry:
|
||||||
raise exception.AuditTemplateAlreadyExists(
|
raise exception.AuditTemplateAlreadyExists(
|
||||||
audit_template=values['name'])
|
audit_template=values['name'])
|
||||||
return audit_template
|
return audit_template
|
||||||
|
|
||||||
def _get_audit_template(self, context, fieldname, value, eager):
|
def _get_audit_template(self, context, fieldname, value, eager):
|
||||||
try:
|
try:
|
||||||
@@ -684,25 +669,26 @@ class Connection(api.BaseConnection):
|
|||||||
if not values.get('uuid'):
|
if not values.get('uuid'):
|
||||||
values['uuid'] = utils.generate_uuid()
|
values['uuid'] = utils.generate_uuid()
|
||||||
|
|
||||||
query = model_query(models.Audit)
|
with _session_for_write() as session:
|
||||||
query = query.filter_by(name=values.get('name'),
|
query = session.query(models.Audit)
|
||||||
deleted_at=None)
|
query = query.filter_by(name=values.get('name'),
|
||||||
|
deleted_at=None)
|
||||||
|
|
||||||
if len(query.all()) > 0:
|
if len(query.all()) > 0:
|
||||||
raise exception.AuditAlreadyExists(
|
raise exception.AuditAlreadyExists(
|
||||||
audit=values['name'])
|
audit=values['name'])
|
||||||
|
|
||||||
if values.get('state') is None:
|
if values.get('state') is None:
|
||||||
values['state'] = objects.audit.State.PENDING
|
values['state'] = objects.audit.State.PENDING
|
||||||
|
|
||||||
if not values.get('auto_trigger'):
|
if not values.get('auto_trigger'):
|
||||||
values['auto_trigger'] = False
|
values['auto_trigger'] = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
audit = self._create(models.Audit, values)
|
audit = self._create(models.Audit, values)
|
||||||
except db_exc.DBDuplicateEntry:
|
except db_exc.DBDuplicateEntry:
|
||||||
raise exception.AuditAlreadyExists(audit=values['uuid'])
|
raise exception.AuditAlreadyExists(audit=values['uuid'])
|
||||||
return audit
|
return audit
|
||||||
|
|
||||||
def _get_audit(self, context, fieldname, value, eager):
|
def _get_audit(self, context, fieldname, value, eager):
|
||||||
try:
|
try:
|
||||||
@@ -726,14 +712,13 @@ class Connection(api.BaseConnection):
|
|||||||
def destroy_audit(self, audit_id):
|
def destroy_audit(self, audit_id):
|
||||||
def is_audit_referenced(session, audit_id):
|
def is_audit_referenced(session, audit_id):
|
||||||
"""Checks whether the audit is referenced by action_plan(s)."""
|
"""Checks whether the audit is referenced by action_plan(s)."""
|
||||||
query = model_query(models.ActionPlan, session=session)
|
query = session.query(models.ActionPlan)
|
||||||
query = self._add_action_plans_filters(
|
query = self._add_action_plans_filters(
|
||||||
query, {'audit_id': audit_id})
|
query, {'audit_id': audit_id})
|
||||||
return query.count() != 0
|
return query.count() != 0
|
||||||
|
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.Audit)
|
||||||
query = model_query(models.Audit, session=session)
|
|
||||||
query = add_identity_filter(query, audit_id)
|
query = add_identity_filter(query, audit_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -800,9 +785,8 @@ class Connection(api.BaseConnection):
|
|||||||
context, fieldname="uuid", value=action_uuid, eager=eager)
|
context, fieldname="uuid", value=action_uuid, eager=eager)
|
||||||
|
|
||||||
def destroy_action(self, action_id):
|
def destroy_action(self, action_id):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.Action)
|
||||||
query = model_query(models.Action, session=session)
|
|
||||||
query = add_identity_filter(query, action_id)
|
query = add_identity_filter(query, action_id)
|
||||||
count = query.delete()
|
count = query.delete()
|
||||||
if count != 1:
|
if count != 1:
|
||||||
@@ -818,9 +802,8 @@ class Connection(api.BaseConnection):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _do_update_action(action_id, values):
|
def _do_update_action(action_id, values):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.Action)
|
||||||
query = model_query(models.Action, session=session)
|
|
||||||
query = add_identity_filter(query, action_id)
|
query = add_identity_filter(query, action_id)
|
||||||
try:
|
try:
|
||||||
ref = query.with_for_update().one()
|
ref = query.with_for_update().one()
|
||||||
@@ -828,7 +811,7 @@ class Connection(api.BaseConnection):
|
|||||||
raise exception.ActionNotFound(action=action_id)
|
raise exception.ActionNotFound(action=action_id)
|
||||||
|
|
||||||
ref.update(values)
|
ref.update(values)
|
||||||
return ref
|
return ref
|
||||||
|
|
||||||
def soft_delete_action(self, action_id):
|
def soft_delete_action(self, action_id):
|
||||||
try:
|
try:
|
||||||
@@ -872,14 +855,13 @@ class Connection(api.BaseConnection):
|
|||||||
def destroy_action_plan(self, action_plan_id):
|
def destroy_action_plan(self, action_plan_id):
|
||||||
def is_action_plan_referenced(session, action_plan_id):
|
def is_action_plan_referenced(session, action_plan_id):
|
||||||
"""Checks whether the action_plan is referenced by action(s)."""
|
"""Checks whether the action_plan is referenced by action(s)."""
|
||||||
query = model_query(models.Action, session=session)
|
query = session.query(models.Action)
|
||||||
query = self._add_actions_filters(
|
query = self._add_actions_filters(
|
||||||
query, {'action_plan_id': action_plan_id})
|
query, {'action_plan_id': action_plan_id})
|
||||||
return query.count() != 0
|
return query.count() != 0
|
||||||
|
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.ActionPlan)
|
||||||
query = model_query(models.ActionPlan, session=session)
|
|
||||||
query = add_identity_filter(query, action_plan_id)
|
query = add_identity_filter(query, action_plan_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -903,9 +885,8 @@ class Connection(api.BaseConnection):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _do_update_action_plan(action_plan_id, values):
|
def _do_update_action_plan(action_plan_id, values):
|
||||||
session = get_session()
|
with _session_for_write() as session:
|
||||||
with session.begin():
|
query = session.query(models.ActionPlan)
|
||||||
query = model_query(models.ActionPlan, session=session)
|
|
||||||
query = add_identity_filter(query, action_plan_id)
|
query = add_identity_filter(query, action_plan_id)
|
||||||
try:
|
try:
|
||||||
ref = query.with_for_update().one()
|
ref = query.with_for_update().one()
|
||||||
@@ -913,7 +894,7 @@ class Connection(api.BaseConnection):
|
|||||||
raise exception.ActionPlanNotFound(action_plan=action_plan_id)
|
raise exception.ActionPlanNotFound(action_plan=action_plan_id)
|
||||||
|
|
||||||
ref.update(values)
|
ref.update(values)
|
||||||
return ref
|
return ref
|
||||||
|
|
||||||
def soft_delete_action_plan(self, action_plan_id):
|
def soft_delete_action_plan(self, action_plan_id):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -20,9 +20,9 @@ import alembic
|
|||||||
from alembic import config as alembic_config
|
from alembic import config as alembic_config
|
||||||
import alembic.migration as alembic_migration
|
import alembic.migration as alembic_migration
|
||||||
from oslo_db import exception as db_exc
|
from oslo_db import exception as db_exc
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
|
|
||||||
from watcher._i18n import _
|
from watcher._i18n import _
|
||||||
from watcher.db.sqlalchemy import api as sqla_api
|
|
||||||
from watcher.db.sqlalchemy import models
|
from watcher.db.sqlalchemy import models
|
||||||
|
|
||||||
|
|
||||||
@@ -39,7 +39,7 @@ def version(engine=None):
|
|||||||
:rtype: string
|
:rtype: string
|
||||||
"""
|
"""
|
||||||
if engine is None:
|
if engine is None:
|
||||||
engine = sqla_api.get_engine()
|
engine = enginefacade.reader.get_engine()
|
||||||
with engine.connect() as conn:
|
with engine.connect() as conn:
|
||||||
context = alembic_migration.MigrationContext.configure(conn)
|
context = alembic_migration.MigrationContext.configure(conn)
|
||||||
return context.get_current_revision()
|
return context.get_current_revision()
|
||||||
@@ -63,7 +63,7 @@ def create_schema(config=None, engine=None):
|
|||||||
Can be used for initial installation instead of upgrade('head').
|
Can be used for initial installation instead of upgrade('head').
|
||||||
"""
|
"""
|
||||||
if engine is None:
|
if engine is None:
|
||||||
engine = sqla_api.get_engine()
|
engine = enginefacade.writer.get_engine()
|
||||||
|
|
||||||
# NOTE(viktors): If we will use metadata.create_all() for non empty db
|
# NOTE(viktors): If we will use metadata.create_all() for non empty db
|
||||||
# schema, it will only add the new tables, but leave
|
# schema, it will only add the new tables, but leave
|
||||||
|
|||||||
@@ -93,14 +93,6 @@ class WatcherBase(models.SoftDeleteMixin,
|
|||||||
d[c.name] = self[c.name]
|
d[c.name] = self[c.name]
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def save(self, session=None):
|
|
||||||
import watcher.db.sqlalchemy.api as db_api
|
|
||||||
|
|
||||||
if session is None:
|
|
||||||
session = db_api.get_session()
|
|
||||||
|
|
||||||
super(WatcherBase, self).save(session)
|
|
||||||
|
|
||||||
|
|
||||||
Base = declarative_base(cls=WatcherBase)
|
Base = declarative_base(cls=WatcherBase)
|
||||||
|
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ class ContinuousAuditHandler(base.AuditHandler):
|
|||||||
self._audit_scheduler = scheduling.BackgroundSchedulerService(
|
self._audit_scheduler = scheduling.BackgroundSchedulerService(
|
||||||
jobstores={
|
jobstores={
|
||||||
'default': job_store.WatcherJobStore(
|
'default': job_store.WatcherJobStore(
|
||||||
engine=sq_api.get_engine()),
|
engine=sq_api.enginefacade.writer.get_engine()),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
return self._audit_scheduler
|
return self._audit_scheduler
|
||||||
|
|||||||
@@ -17,9 +17,10 @@
|
|||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_db.sqlalchemy import enginefacade
|
||||||
|
|
||||||
|
|
||||||
from watcher.db import api as dbapi
|
from watcher.db import api as dbapi
|
||||||
from watcher.db.sqlalchemy import api as sqla_api
|
|
||||||
from watcher.db.sqlalchemy import migration
|
from watcher.db.sqlalchemy import migration
|
||||||
from watcher.db.sqlalchemy import models
|
from watcher.db.sqlalchemy import models
|
||||||
from watcher.tests import base
|
from watcher.tests import base
|
||||||
@@ -35,16 +36,16 @@ _DB_CACHE = None
|
|||||||
|
|
||||||
class Database(fixtures.Fixture):
|
class Database(fixtures.Fixture):
|
||||||
|
|
||||||
def __init__(self, db_api, db_migrate, sql_connection):
|
def __init__(self, engine, db_migrate, sql_connection):
|
||||||
self.sql_connection = sql_connection
|
self.sql_connection = sql_connection
|
||||||
|
|
||||||
self.engine = db_api.get_engine()
|
self.engine = engine
|
||||||
self.engine.dispose()
|
self.engine.dispose()
|
||||||
conn = self.engine.connect()
|
|
||||||
self.setup_sqlite(db_migrate)
|
|
||||||
self.post_migrations()
|
|
||||||
|
|
||||||
self._DB = "".join(line for line in conn.connection.iterdump())
|
with self.engine.connect() as conn:
|
||||||
|
self.setup_sqlite(db_migrate)
|
||||||
|
self.post_migrations()
|
||||||
|
self._DB = "".join(line for line in conn.connection.iterdump())
|
||||||
self.engine.dispose()
|
self.engine.dispose()
|
||||||
|
|
||||||
def setup_sqlite(self, db_migrate):
|
def setup_sqlite(self, db_migrate):
|
||||||
@@ -55,9 +56,8 @@ class Database(fixtures.Fixture):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(Database, self).setUp()
|
super(Database, self).setUp()
|
||||||
|
with self.engine.connect() as conn:
|
||||||
conn = self.engine.connect()
|
conn.connection.executescript(self._DB)
|
||||||
conn.connection.executescript(self._DB)
|
|
||||||
self.addCleanup(self.engine.dispose)
|
self.addCleanup(self.engine.dispose)
|
||||||
|
|
||||||
def post_migrations(self):
|
def post_migrations(self):
|
||||||
@@ -80,7 +80,9 @@ class DbTestCase(base.TestCase):
|
|||||||
|
|
||||||
global _DB_CACHE
|
global _DB_CACHE
|
||||||
if not _DB_CACHE:
|
if not _DB_CACHE:
|
||||||
_DB_CACHE = Database(sqla_api, migration,
|
engine = enginefacade.writer.get_engine()
|
||||||
|
_DB_CACHE = Database(engine, migration,
|
||||||
sql_connection=CONF.database.connection)
|
sql_connection=CONF.database.connection)
|
||||||
|
engine.dispose()
|
||||||
self.useFixture(_DB_CACHE)
|
self.useFixture(_DB_CACHE)
|
||||||
self._id_gen = utils.id_generator()
|
self._id_gen = utils.id_generator()
|
||||||
|
|||||||
@@ -263,7 +263,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
cfg.CONF.set_override("host", "hostname1")
|
cfg.CONF.set_override("host", "hostname1")
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -286,7 +286,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
self.assertIsNone(self.audits[1].next_run_time)
|
self.assertIsNone(self.audits[1].next_run_time)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -309,7 +309,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
|
|
||||||
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
|
@mock.patch.object(continuous.ContinuousAuditHandler, '_next_cron_time')
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -328,7 +328,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
audit_handler.launch_audits_periodically)
|
audit_handler.launch_audits_periodically)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -349,7 +349,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
m_add_job.assert_has_calls(calls)
|
m_add_job.assert_has_calls(calls)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
@@ -384,7 +384,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
self.assertTrue(is_inactive)
|
self.assertTrue(is_inactive)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
@mock.patch.object(objects.audit.AuditStateTransitionManager,
|
@mock.patch.object(objects.audit.AuditStateTransitionManager,
|
||||||
'is_inactive')
|
'is_inactive')
|
||||||
@@ -406,7 +406,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
self.assertIsNotNone(self.audits[0].next_run_time)
|
self.assertIsNotNone(self.audits[0].next_run_time)
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api.enginefacade.writer, 'get_engine')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'remove_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||||
|
|||||||
Reference in New Issue
Block a user