Compare commits

..

1 Commits

Author SHA1 Message Date
James Page
6b433b3547 Fix oslo.db >= 15.0.0 compatibility
Minimal refactor of SQLAlchemy api module to be compatible with
oslo.db >= 15.0.0 where autocommit behaviour was dropped.

Closes-Bug: #2056181
Change-Id: I33be53f647faae2aad30a43c10980df950d5d7c2
(cherry picked from commit bc5922c684)
2024-03-27 15:14:22 +00:00
4 changed files with 16 additions and 38 deletions

View File

@@ -3,16 +3,15 @@
# Andi Chandler <andi@gowling.com>, 2020. #zanata # Andi Chandler <andi@gowling.com>, 2020. #zanata
# Andi Chandler <andi@gowling.com>, 2022. #zanata # Andi Chandler <andi@gowling.com>, 2022. #zanata
# Andi Chandler <andi@gowling.com>, 2023. #zanata # Andi Chandler <andi@gowling.com>, 2023. #zanata
# Andi Chandler <andi@gowling.com>, 2024. #zanata
msgid "" msgid ""
msgstr "" msgstr ""
"Project-Id-Version: python-watcher\n" "Project-Id-Version: python-watcher\n"
"Report-Msgid-Bugs-To: \n" "Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2024-05-31 14:40+0000\n" "POT-Creation-Date: 2023-08-14 03:05+0000\n"
"MIME-Version: 1.0\n" "MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n" "Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n" "Content-Transfer-Encoding: 8bit\n"
"PO-Revision-Date: 2024-04-18 12:21+0000\n" "PO-Revision-Date: 2023-06-21 07:54+0000\n"
"Last-Translator: Andi Chandler <andi@gowling.com>\n" "Last-Translator: Andi Chandler <andi@gowling.com>\n"
"Language-Team: English (United Kingdom)\n" "Language-Team: English (United Kingdom)\n"
"Language: en_GB\n" "Language: en_GB\n"
@@ -64,9 +63,6 @@ msgstr "2.0.0"
msgid "2023.1 Series Release Notes" msgid "2023.1 Series Release Notes"
msgstr "2023.1 Series Release Notes" msgstr "2023.1 Series Release Notes"
msgid "2023.2 Series Release Notes"
msgstr "2023.2 Series Release Notes"
msgid "3.0.0" msgid "3.0.0"
msgstr "3.0.0" msgstr "3.0.0"

View File

@@ -28,7 +28,7 @@ def upgrade():
op.create_table( op.create_table(
'apscheduler_jobs', 'apscheduler_jobs',
sa.Column('id', sa.Unicode(191), sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False),
nullable=False), nullable=False),
sa.Column('next_run_time', sa.Float(25), index=True), sa.Column('next_run_time', sa.Float(25), index=True),
sa.Column('job_state', sa.LargeBinary, nullable=False), sa.Column('job_state', sa.LargeBinary, nullable=False),

View File

@@ -244,8 +244,7 @@ class Connection(api.BaseConnection):
for relationship in relationships: for relationship in relationships:
if not relationship.uselist: if not relationship.uselist:
# We have a One-to-X relationship # We have a One-to-X relationship
query = query.options(joinedload( query = query.options(joinedload(relationship.key))
getattr(model, relationship.key)))
return query return query
def _create(self, model, values): def _create(self, model, values):

View File

@@ -22,7 +22,6 @@ from apscheduler.jobstores.base import ConflictingIdError
from apscheduler.jobstores import sqlalchemy from apscheduler.jobstores import sqlalchemy
from apscheduler.util import datetime_to_utc_timestamp from apscheduler.util import datetime_to_utc_timestamp
from apscheduler.util import maybe_ref from apscheduler.util import maybe_ref
from apscheduler.util import utc_timestamp_to_datetime
from watcher.common import context from watcher.common import context
from watcher.common import service from watcher.common import service
@@ -33,7 +32,7 @@ try:
except ImportError: # pragma: nocover except ImportError: # pragma: nocover
import pickle import pickle
from sqlalchemy import Table, MetaData, select, and_, null from sqlalchemy import Table, MetaData, select, and_
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@@ -59,7 +58,8 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
super(WatcherJobStore, self).__init__(url, engine, tablename, super(WatcherJobStore, self).__init__(url, engine, tablename,
metadata, pickle_protocol) metadata, pickle_protocol)
metadata = maybe_ref(metadata) or MetaData() metadata = maybe_ref(metadata) or MetaData()
self.jobs_t = Table(tablename, metadata, autoload_with=engine) self.jobs_t = Table(tablename, metadata, autoload=True,
autoload_with=engine)
service_ident = service.ServiceHeartbeat.get_service_name() service_ident = service.ServiceHeartbeat.get_service_name()
self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]} self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]}
self.service_id = objects.Service.list(context=context.make_context(), self.service_id = objects.Service.list(context=context.make_context(),
@@ -79,8 +79,7 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
'tag': jsonutils.dumps(self.tag) 'tag': jsonutils.dumps(self.tag)
}) })
try: try:
with self.engine.begin() as conn: self.engine.execute(insert)
conn.execute(insert)
except IntegrityError: except IntegrityError:
raise ConflictingIdError(job.id) raise ConflictingIdError(job.id)
@@ -89,36 +88,20 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
self._fix_paused_jobs_sorting(jobs) self._fix_paused_jobs_sorting(jobs)
return jobs return jobs
def get_next_run_time(self):
selectable = select(self.jobs_t.c.next_run_time).\
where(self.jobs_t.c.next_run_time != null()).\
order_by(self.jobs_t.c.next_run_time).limit(1)
with self.engine.begin() as connection:
# NOTE(danms): The apscheduler implementation of this gets a
# decimal.Decimal back from scalar() which causes
# utc_timestamp_to_datetime() to choke since it is expecting a
# python float. Assume this is SQLAlchemy 2.0 stuff, so just
# coerce to a float here.
next_run_time = connection.execute(selectable).scalar()
return utc_timestamp_to_datetime(float(next_run_time)
if next_run_time is not None
else None)
def _get_jobs(self, *conditions): def _get_jobs(self, *conditions):
jobs = [] jobs = []
conditions += (self.jobs_t.c.service_id == self.service_id,) conditions += (self.jobs_t.c.service_id == self.service_id,)
selectable = select( selectable = select(
self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag [self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag]
).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions)) ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions))
failed_job_ids = set() failed_job_ids = set()
with self.engine.begin() as conn: for row in self.engine.execute(selectable):
for row in conn.execute(selectable): try:
try: jobs.append(self._reconstitute_job(row.job_state))
jobs.append(self._reconstitute_job(row.job_state)) except Exception:
except Exception: self._logger.exception(
self._logger.exception( 'Unable to restore job "%s" -- removing it', row.id)
'Unable to restore job "%s" -- removing it', row.id) failed_job_ids.add(row.id)
failed_job_ids.add(row.id)
# Remove all the jobs we failed to restore # Remove all the jobs we failed to restore
if failed_job_ids: if failed_job_ids: