From d6f169197efc5b4f6c8a2e6bc38177b0641ca05c Mon Sep 17 00:00:00 2001 From: Takashi Kajinami Date: Wed, 8 May 2024 15:24:52 +0900 Subject: [PATCH] SQLAlchemy 2.0: Omnibus fixes patch This was originally five patches, but they are all needed to pass any of the test jobs now, so they have been squashed into one: Co-Authored-By: Dan Smith (dms@danplanet.com) First: The autoload argument was removed[1] in SQLAlchemy and only the autoload_with argument should be passed. The autoload argument is set according to the autoload_with argument automatically even in SQLAlchemy 1.x[2] so is not at all needed. [1] https://github.com/sqlalchemy/sqlalchemy/commit/c932123bacad9bf047d160b85e3f95d396c513ae [2] https://github.com/sqlalchemy/sqlalchemy/commit/ad8f921e969b6f735dc8e08d882c961dde78f2b1 Second: Remove _warn_on_bytestring for newer SA, AFAICT, this flag has been removed from SQLAlchemy and that is why watcher-db-manage fails to initialize the DB for me on jammy. This migration was passing the default value (=False) anyway, so I assume this is the right "fix". Third: Fix joinedload passing string attribute names Fourth: Fix engine.select pattern to use begin() per the migration guide. Fifth: Override the apscheduler get_next_run_time() which appears to be trivially not compatible with SQLAlchemy 2.0 because of a return type from scalar(). Change-Id: I000e5e78f97f82ed4ea64d42f1c38354c3252e08 --- .../0f6042416884_add_apscheduler_jobs.py | 2 +- watcher/db/sqlalchemy/api.py | 3 +- watcher/db/sqlalchemy/job_store.py | 41 +++++++++++++------ 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py b/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py index 5f2854313..98773adf5 100644 --- a/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py +++ b/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py @@ -28,7 +28,7 @@ def upgrade(): op.create_table( 'apscheduler_jobs', - sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False), + sa.Column('id', sa.Unicode(191), nullable=False), sa.Column('next_run_time', sa.Float(25), index=True), sa.Column('job_state', sa.LargeBinary, nullable=False), diff --git a/watcher/db/sqlalchemy/api.py b/watcher/db/sqlalchemy/api.py index 4bc30bdae..50d66a8c6 100644 --- a/watcher/db/sqlalchemy/api.py +++ b/watcher/db/sqlalchemy/api.py @@ -244,7 +244,8 @@ class Connection(api.BaseConnection): for relationship in relationships: if not relationship.uselist: # We have a One-to-X relationship - query = query.options(joinedload(relationship.key)) + query = query.options(joinedload( + getattr(model, relationship.key))) return query def _create(self, model, values): diff --git a/watcher/db/sqlalchemy/job_store.py b/watcher/db/sqlalchemy/job_store.py index da5028f43..92e631ada 100644 --- a/watcher/db/sqlalchemy/job_store.py +++ b/watcher/db/sqlalchemy/job_store.py @@ -22,6 +22,7 @@ from apscheduler.jobstores.base import ConflictingIdError from apscheduler.jobstores import sqlalchemy from apscheduler.util import datetime_to_utc_timestamp from apscheduler.util import maybe_ref +from apscheduler.util import utc_timestamp_to_datetime from watcher.common import context from watcher.common import service @@ -32,7 +33,7 @@ try: except ImportError: # pragma: nocover import pickle -from sqlalchemy import Table, MetaData, select, and_ +from sqlalchemy import Table, MetaData, select, and_, null from sqlalchemy.exc import IntegrityError @@ -58,8 +59,7 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): super(WatcherJobStore, self).__init__(url, engine, tablename, metadata, pickle_protocol) metadata = maybe_ref(metadata) or MetaData() - self.jobs_t = Table(tablename, metadata, autoload=True, - autoload_with=engine) + self.jobs_t = Table(tablename, metadata, autoload_with=engine) service_ident = service.ServiceHeartbeat.get_service_name() self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]} self.service_id = objects.Service.list(context=context.make_context(), @@ -79,7 +79,8 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): 'tag': jsonutils.dumps(self.tag) }) try: - self.engine.execute(insert) + with self.engine.begin() as conn: + conn.execute(insert) except IntegrityError: raise ConflictingIdError(job.id) @@ -88,20 +89,36 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): self._fix_paused_jobs_sorting(jobs) return jobs + def get_next_run_time(self): + selectable = select(self.jobs_t.c.next_run_time).\ + where(self.jobs_t.c.next_run_time != null()).\ + order_by(self.jobs_t.c.next_run_time).limit(1) + with self.engine.begin() as connection: + # NOTE(danms): The apscheduler implementation of this gets a + # decimal.Decimal back from scalar() which causes + # utc_timestamp_to_datetime() to choke since it is expecting a + # python float. Assume this is SQLAlchemy 2.0 stuff, so just + # coerce to a float here. + next_run_time = connection.execute(selectable).scalar() + return utc_timestamp_to_datetime(float(next_run_time) + if next_run_time is not None + else None) + def _get_jobs(self, *conditions): jobs = [] conditions += (self.jobs_t.c.service_id == self.service_id,) selectable = select( - [self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag] + self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions)) failed_job_ids = set() - for row in self.engine.execute(selectable): - try: - jobs.append(self._reconstitute_job(row.job_state)) - except Exception: - self._logger.exception( - 'Unable to restore job "%s" -- removing it', row.id) - failed_job_ids.add(row.id) + with self.engine.begin() as conn: + for row in conn.execute(selectable): + try: + jobs.append(self._reconstitute_job(row.job_state)) + except Exception: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', row.id) + failed_job_ids.add(row.id) # Remove all the jobs we failed to restore if failed_job_ids: