Merge "Add HA support"
This commit is contained in:
@@ -20,6 +20,7 @@
|
||||
import abc
|
||||
import six
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.applier import rpcapi
|
||||
@@ -31,6 +32,7 @@ from watcher import notifications
|
||||
from watcher import objects
|
||||
from watcher.objects import fields
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -120,6 +122,8 @@ class AuditHandler(BaseAuditHandler):
|
||||
def pre_execute(self, audit, request_context):
|
||||
LOG.debug("Trigger audit %s", audit.uuid)
|
||||
self.check_ongoing_action_plans(request_context)
|
||||
# Write hostname that will execute this audit.
|
||||
audit.hostname = CONF.host
|
||||
# change state of the audit to ONGOING
|
||||
self.update_audit_state(audit, objects.audit.State.ONGOING)
|
||||
|
||||
|
||||
@@ -124,10 +124,20 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
'audit_type': objects.audit.AuditType.CONTINUOUS.value,
|
||||
'state__in': (objects.audit.State.PENDING,
|
||||
objects.audit.State.ONGOING,
|
||||
objects.audit.State.SUCCEEDED)
|
||||
objects.audit.State.SUCCEEDED),
|
||||
'hostname__in': (None, CONF.host)
|
||||
}
|
||||
audits = objects.Audit.list(
|
||||
audit_context, filters=audit_filters, eager=True)
|
||||
for audit in audits:
|
||||
# If continuous audit doesn't have a hostname yet,
|
||||
# Watcher will set current CONF.host value.
|
||||
if audit.hostname is None:
|
||||
audit.hostname = CONF.host
|
||||
audit.save()
|
||||
# Let's remove this audit from current execution
|
||||
# and execute it as usual Audit with hostname later.
|
||||
audits.remove(audit)
|
||||
scheduler_job_args = [
|
||||
(job.args[0].uuid, job) for job
|
||||
in self.scheduler.get_jobs()
|
||||
@@ -173,6 +183,7 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
audit.next_run_time = self._next_cron_time(audit)
|
||||
self._add_job('date', audit, audit_context,
|
||||
run_date=audit.next_run_time)
|
||||
audit.hostname = CONF.host
|
||||
audit.save()
|
||||
|
||||
def start(self):
|
||||
|
||||
@@ -88,10 +88,31 @@ class DecisionEngineSchedulingService(scheduling.BackgroundSchedulerService):
|
||||
seconds=interval,
|
||||
next_run_time=datetime.datetime.now())
|
||||
|
||||
def cancel_ongoing_audits(self):
|
||||
audit_filters = {
|
||||
'audit_type': objects.audit.AuditType.ONESHOT.value,
|
||||
'state': objects.audit.State.ONGOING,
|
||||
'hostname': CONF.host
|
||||
}
|
||||
local_context = context.make_context()
|
||||
ongoing_audits = objects.Audit.list(
|
||||
local_context,
|
||||
filters=audit_filters)
|
||||
for audit in ongoing_audits:
|
||||
audit.state = objects.audit.State.CANCELLED
|
||||
audit.save()
|
||||
LOG.info("Audit %(uuid)s has been cancelled because it was in "
|
||||
"%(state)s state when Decision Engine had been stopped "
|
||||
"on %(hostname)s host.",
|
||||
{'uuid': audit.uuid,
|
||||
'state': objects.audit.State.ONGOING,
|
||||
'hostname': audit.hostname})
|
||||
|
||||
def start(self):
|
||||
"""Start service."""
|
||||
self.add_sync_jobs()
|
||||
self.add_checkstate_job()
|
||||
self.cancel_ongoing_audits()
|
||||
super(DecisionEngineSchedulingService, self).start()
|
||||
|
||||
def stop(self):
|
||||
|
||||
Reference in New Issue
Block a user