diff --git a/watcher/api/scheduling.py b/watcher/api/scheduling.py index adeccd703..3f72b0e88 100644 --- a/watcher/api/scheduling.py +++ b/watcher/api/scheduling.py @@ -16,6 +16,7 @@ import datetime +import itertools from oslo_config import cfg from oslo_log import log from oslo_utils import timeutils @@ -40,6 +41,8 @@ class APISchedulingService(scheduling.BackgroundSchedulerService): def get_services_status(self, context): services = objects.service.Service.list(context) + active_s = objects.service.ServiceStatus.ACTIVE + failed_s = objects.service.ServiceStatus.FAILED for service in services: result = self.get_service_status(context, service.id) if service.id not in self.services_status: @@ -49,6 +52,32 @@ class APISchedulingService(scheduling.BackgroundSchedulerService): self.services_status[service.id] = result notifications.service.send_service_update(context, service, state=result) + if result == failed_s: + audit_filters = { + 'audit_type': objects.audit.AuditType.CONTINUOUS.value, + 'state': objects.audit.State.ONGOING, + 'hostname': service.host + } + ongoing_audits = objects.Audit.list( + context, + filters=audit_filters, + eager=True) + alive_services = [ + s.host for s in services + if (self.services_status[s.id] == active_s and + s.name == 'watcher-decision-engine')] + + round_robin = itertools.cycle(alive_services) + for audit in ongoing_audits: + audit.hostname = round_robin.__next__() + audit.save() + LOG.info('Audit %(audit)s has been migrated to ' + '%(host)s since %(failed_host)s is in' + ' %(state)s', + {'audit': audit.uuid, + 'host': audit.hostname, + 'failed_host': service.host, + 'state': failed_s}) def get_service_status(self, context, service_id): service = objects.Service.get(context, service_id) diff --git a/watcher/db/sqlalchemy/api.py b/watcher/db/sqlalchemy/api.py index 1366054ff..ee641d34b 100644 --- a/watcher/db/sqlalchemy/api.py +++ b/watcher/db/sqlalchemy/api.py @@ -375,7 +375,7 @@ class Connection(api.BaseConnection): filters = {} plain_fields = ['uuid', 'audit_type', 'state', 'goal_id', - 'strategy_id'] + 'strategy_id', 'hostname'] join_fieldmap = { 'goal_uuid': ("uuid", models.Goal), 'goal_name': ("name", models.Goal), diff --git a/watcher/decision_engine/audit/continuous.py b/watcher/decision_engine/audit/continuous.py index 3a86f2c5e..50e00c054 100644 --- a/watcher/decision_engine/audit/continuous.py +++ b/watcher/decision_engine/audit/continuous.py @@ -59,7 +59,8 @@ class ContinuousAuditHandler(base.AuditHandler): def _is_audit_inactive(self, audit): audit = objects.Audit.get_by_uuid( self.context_show_deleted, audit.uuid) - if objects.audit.AuditStateTransitionManager().is_inactive(audit): + if (objects.audit.AuditStateTransitionManager().is_inactive(audit) or + audit.hostname != CONF.host): # if audit isn't in active states, audit's job must be removed to # prevent using of inactive audit in future. jobs = [job for job in self.scheduler.get_jobs() @@ -125,28 +126,31 @@ class ContinuousAuditHandler(base.AuditHandler): 'state__in': (objects.audit.State.PENDING, objects.audit.State.ONGOING, objects.audit.State.SUCCEEDED), - 'hostname__in': (None, CONF.host) } - audits = objects.Audit.list( + audit_filters['hostname'] = None + unscheduled_audits = objects.Audit.list( audit_context, filters=audit_filters, eager=True) - for audit in audits: + for audit in unscheduled_audits: # If continuous audit doesn't have a hostname yet, # Watcher will set current CONF.host value. - if audit.hostname is None: - audit.hostname = CONF.host - audit.save() - # Let's remove this audit from current execution - # and execute it as usual Audit with hostname later. - audits.remove(audit) + # TODO(alexchadin): Add scheduling of new continuous audits. + audit.hostname = CONF.host + audit.save() scheduler_job_args = [ (job.args[0].uuid, job) for job in self.scheduler.get_jobs() if job.name == 'execute_audit'] scheduler_jobs = dict(scheduler_job_args) # if audit isn't in active states, audit's job should be removed + jobs_to_remove = [] for job in scheduler_jobs.values(): if self._is_audit_inactive(job.args[0]): - scheduler_jobs.pop(job.args[0].uuid) + jobs_to_remove.append(job.args[0].uuid) + for audit_uuid in jobs_to_remove: + scheduler_jobs.pop(audit_uuid) + audit_filters['hostname'] = CONF.host + audits = objects.Audit.list( + audit_context, filters=audit_filters, eager=True) for audit in audits: existing_job = scheduler_jobs.get(audit.uuid, None) # if audit is not presented in scheduled audits yet, diff --git a/watcher/tests/applier/test_sync.py b/watcher/tests/applier/test_sync.py index 2bf814ab2..9c5c0b87f 100644 --- a/watcher/tests/applier/test_sync.py +++ b/watcher/tests/applier/test_sync.py @@ -66,7 +66,7 @@ class TestCancelOngoingActionPlans(db_base.DbTestCase): self.context, action_plan_id=1, state=objects.action.State.PENDING) - cfg.CONF.set_override('host', 'hostname1') + cfg.CONF.set_override("host", "hostname1") @mock.patch.object(objects.action.Action, 'save') @mock.patch.object(objects.action_plan.ActionPlan, 'save') diff --git a/watcher/tests/decision_engine/audit/test_audit_handlers.py b/watcher/tests/decision_engine/audit/test_audit_handlers.py index 1e3f2aa4f..4922fd750 100644 --- a/watcher/tests/decision_engine/audit/test_audit_handlers.py +++ b/watcher/tests/decision_engine/audit/test_audit_handlers.py @@ -17,6 +17,7 @@ import datetime import mock +from oslo_config import cfg from oslo_utils import uuidutils from apscheduler import job @@ -242,8 +243,10 @@ class TestContinuousAuditHandler(base.DbTestCase): audit_template_id=audit_template.id, goal_id=self.goal.id, audit_type=objects.audit.AuditType.CONTINUOUS.value, - goal=self.goal) + goal=self.goal, + hostname='hostname1') for id_ in range(2, 4)] + cfg.CONF.set_override("host", "hostname1") @mock.patch.object(objects.service.Service, 'list') @mock.patch.object(sq_api, 'get_engine') diff --git a/watcher/tests/decision_engine/test_scheduling.py b/watcher/tests/decision_engine/test_scheduling.py index 345154431..278aa28a2 100644 --- a/watcher/tests/decision_engine/test_scheduling.py +++ b/watcher/tests/decision_engine/test_scheduling.py @@ -62,7 +62,7 @@ class TestCancelOngoingAudits(db_base.DbTestCase): goal=self.goal, hostname='hostname1', state=objects.audit.State.ONGOING) - cfg.CONF.set_override('host', 'hostname1') + cfg.CONF.set_override("host", "hostname1") @mock.patch.object(objects.audit.Audit, 'save') @mock.patch.object(objects.audit.Audit, 'list')