diff --git a/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py b/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py index 5f2854313..98773adf5 100644 --- a/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py +++ b/watcher/db/sqlalchemy/alembic/versions/0f6042416884_add_apscheduler_jobs.py @@ -28,7 +28,7 @@ def upgrade(): op.create_table( 'apscheduler_jobs', - sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False), + sa.Column('id', sa.Unicode(191), nullable=False), sa.Column('next_run_time', sa.Float(25), index=True), sa.Column('job_state', sa.LargeBinary, nullable=False), diff --git a/watcher/db/sqlalchemy/api.py b/watcher/db/sqlalchemy/api.py index 4bc30bdae..50d66a8c6 100644 --- a/watcher/db/sqlalchemy/api.py +++ b/watcher/db/sqlalchemy/api.py @@ -244,7 +244,8 @@ class Connection(api.BaseConnection): for relationship in relationships: if not relationship.uselist: # We have a One-to-X relationship - query = query.options(joinedload(relationship.key)) + query = query.options(joinedload( + getattr(model, relationship.key))) return query def _create(self, model, values): diff --git a/watcher/db/sqlalchemy/job_store.py b/watcher/db/sqlalchemy/job_store.py index da5028f43..92e631ada 100644 --- a/watcher/db/sqlalchemy/job_store.py +++ b/watcher/db/sqlalchemy/job_store.py @@ -22,6 +22,7 @@ from apscheduler.jobstores.base import ConflictingIdError from apscheduler.jobstores import sqlalchemy from apscheduler.util import datetime_to_utc_timestamp from apscheduler.util import maybe_ref +from apscheduler.util import utc_timestamp_to_datetime from watcher.common import context from watcher.common import service @@ -32,7 +33,7 @@ try: except ImportError: # pragma: nocover import pickle -from sqlalchemy import Table, MetaData, select, and_ +from sqlalchemy import Table, MetaData, select, and_, null from sqlalchemy.exc import IntegrityError @@ -58,8 +59,7 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): super(WatcherJobStore, self).__init__(url, engine, tablename, metadata, pickle_protocol) metadata = maybe_ref(metadata) or MetaData() - self.jobs_t = Table(tablename, metadata, autoload=True, - autoload_with=engine) + self.jobs_t = Table(tablename, metadata, autoload_with=engine) service_ident = service.ServiceHeartbeat.get_service_name() self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]} self.service_id = objects.Service.list(context=context.make_context(), @@ -79,7 +79,8 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): 'tag': jsonutils.dumps(self.tag) }) try: - self.engine.execute(insert) + with self.engine.begin() as conn: + conn.execute(insert) except IntegrityError: raise ConflictingIdError(job.id) @@ -88,20 +89,36 @@ class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): self._fix_paused_jobs_sorting(jobs) return jobs + def get_next_run_time(self): + selectable = select(self.jobs_t.c.next_run_time).\ + where(self.jobs_t.c.next_run_time != null()).\ + order_by(self.jobs_t.c.next_run_time).limit(1) + with self.engine.begin() as connection: + # NOTE(danms): The apscheduler implementation of this gets a + # decimal.Decimal back from scalar() which causes + # utc_timestamp_to_datetime() to choke since it is expecting a + # python float. Assume this is SQLAlchemy 2.0 stuff, so just + # coerce to a float here. + next_run_time = connection.execute(selectable).scalar() + return utc_timestamp_to_datetime(float(next_run_time) + if next_run_time is not None + else None) + def _get_jobs(self, *conditions): jobs = [] conditions += (self.jobs_t.c.service_id == self.service_id,) selectable = select( - [self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag] + self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions)) failed_job_ids = set() - for row in self.engine.execute(selectable): - try: - jobs.append(self._reconstitute_job(row.job_state)) - except Exception: - self._logger.exception( - 'Unable to restore job "%s" -- removing it', row.id) - failed_job_ids.add(row.id) + with self.engine.begin() as conn: + for row in conn.execute(selectable): + try: + jobs.append(self._reconstitute_job(row.job_state)) + except Exception: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', row.id) + failed_job_ids.add(row.id) # Remove all the jobs we failed to restore if failed_job_ids: