Merge "Rescheduling continuous audits from FAILED nodes"
This commit is contained in:
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import itertools
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
@@ -40,6 +41,8 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
|
|||||||
|
|
||||||
def get_services_status(self, context):
|
def get_services_status(self, context):
|
||||||
services = objects.service.Service.list(context)
|
services = objects.service.Service.list(context)
|
||||||
|
active_s = objects.service.ServiceStatus.ACTIVE
|
||||||
|
failed_s = objects.service.ServiceStatus.FAILED
|
||||||
for service in services:
|
for service in services:
|
||||||
result = self.get_service_status(context, service.id)
|
result = self.get_service_status(context, service.id)
|
||||||
if service.id not in self.services_status:
|
if service.id not in self.services_status:
|
||||||
@@ -49,6 +52,32 @@ class APISchedulingService(scheduling.BackgroundSchedulerService):
|
|||||||
self.services_status[service.id] = result
|
self.services_status[service.id] = result
|
||||||
notifications.service.send_service_update(context, service,
|
notifications.service.send_service_update(context, service,
|
||||||
state=result)
|
state=result)
|
||||||
|
if result == failed_s:
|
||||||
|
audit_filters = {
|
||||||
|
'audit_type': objects.audit.AuditType.CONTINUOUS.value,
|
||||||
|
'state': objects.audit.State.ONGOING,
|
||||||
|
'hostname': service.host
|
||||||
|
}
|
||||||
|
ongoing_audits = objects.Audit.list(
|
||||||
|
context,
|
||||||
|
filters=audit_filters,
|
||||||
|
eager=True)
|
||||||
|
alive_services = [
|
||||||
|
s.host for s in services
|
||||||
|
if (self.services_status[s.id] == active_s and
|
||||||
|
s.name == 'watcher-decision-engine')]
|
||||||
|
|
||||||
|
round_robin = itertools.cycle(alive_services)
|
||||||
|
for audit in ongoing_audits:
|
||||||
|
audit.hostname = round_robin.__next__()
|
||||||
|
audit.save()
|
||||||
|
LOG.info('Audit %(audit)s has been migrated to '
|
||||||
|
'%(host)s since %(failed_host)s is in'
|
||||||
|
' %(state)s',
|
||||||
|
{'audit': audit.uuid,
|
||||||
|
'host': audit.hostname,
|
||||||
|
'failed_host': service.host,
|
||||||
|
'state': failed_s})
|
||||||
|
|
||||||
def get_service_status(self, context, service_id):
|
def get_service_status(self, context, service_id):
|
||||||
service = objects.Service.get(context, service_id)
|
service = objects.Service.get(context, service_id)
|
||||||
|
|||||||
@@ -375,7 +375,7 @@ class Connection(api.BaseConnection):
|
|||||||
filters = {}
|
filters = {}
|
||||||
|
|
||||||
plain_fields = ['uuid', 'audit_type', 'state', 'goal_id',
|
plain_fields = ['uuid', 'audit_type', 'state', 'goal_id',
|
||||||
'strategy_id']
|
'strategy_id', 'hostname']
|
||||||
join_fieldmap = {
|
join_fieldmap = {
|
||||||
'goal_uuid': ("uuid", models.Goal),
|
'goal_uuid': ("uuid", models.Goal),
|
||||||
'goal_name': ("name", models.Goal),
|
'goal_name': ("name", models.Goal),
|
||||||
|
|||||||
@@ -59,7 +59,8 @@ class ContinuousAuditHandler(base.AuditHandler):
|
|||||||
def _is_audit_inactive(self, audit):
|
def _is_audit_inactive(self, audit):
|
||||||
audit = objects.Audit.get_by_uuid(
|
audit = objects.Audit.get_by_uuid(
|
||||||
self.context_show_deleted, audit.uuid)
|
self.context_show_deleted, audit.uuid)
|
||||||
if objects.audit.AuditStateTransitionManager().is_inactive(audit):
|
if (objects.audit.AuditStateTransitionManager().is_inactive(audit) or
|
||||||
|
audit.hostname != CONF.host):
|
||||||
# if audit isn't in active states, audit's job must be removed to
|
# if audit isn't in active states, audit's job must be removed to
|
||||||
# prevent using of inactive audit in future.
|
# prevent using of inactive audit in future.
|
||||||
jobs = [job for job in self.scheduler.get_jobs()
|
jobs = [job for job in self.scheduler.get_jobs()
|
||||||
@@ -125,28 +126,31 @@ class ContinuousAuditHandler(base.AuditHandler):
|
|||||||
'state__in': (objects.audit.State.PENDING,
|
'state__in': (objects.audit.State.PENDING,
|
||||||
objects.audit.State.ONGOING,
|
objects.audit.State.ONGOING,
|
||||||
objects.audit.State.SUCCEEDED),
|
objects.audit.State.SUCCEEDED),
|
||||||
'hostname__in': (None, CONF.host)
|
|
||||||
}
|
}
|
||||||
audits = objects.Audit.list(
|
audit_filters['hostname'] = None
|
||||||
|
unscheduled_audits = objects.Audit.list(
|
||||||
audit_context, filters=audit_filters, eager=True)
|
audit_context, filters=audit_filters, eager=True)
|
||||||
for audit in audits:
|
for audit in unscheduled_audits:
|
||||||
# If continuous audit doesn't have a hostname yet,
|
# If continuous audit doesn't have a hostname yet,
|
||||||
# Watcher will set current CONF.host value.
|
# Watcher will set current CONF.host value.
|
||||||
if audit.hostname is None:
|
# TODO(alexchadin): Add scheduling of new continuous audits.
|
||||||
audit.hostname = CONF.host
|
audit.hostname = CONF.host
|
||||||
audit.save()
|
audit.save()
|
||||||
# Let's remove this audit from current execution
|
|
||||||
# and execute it as usual Audit with hostname later.
|
|
||||||
audits.remove(audit)
|
|
||||||
scheduler_job_args = [
|
scheduler_job_args = [
|
||||||
(job.args[0].uuid, job) for job
|
(job.args[0].uuid, job) for job
|
||||||
in self.scheduler.get_jobs()
|
in self.scheduler.get_jobs()
|
||||||
if job.name == 'execute_audit']
|
if job.name == 'execute_audit']
|
||||||
scheduler_jobs = dict(scheduler_job_args)
|
scheduler_jobs = dict(scheduler_job_args)
|
||||||
# if audit isn't in active states, audit's job should be removed
|
# if audit isn't in active states, audit's job should be removed
|
||||||
|
jobs_to_remove = []
|
||||||
for job in scheduler_jobs.values():
|
for job in scheduler_jobs.values():
|
||||||
if self._is_audit_inactive(job.args[0]):
|
if self._is_audit_inactive(job.args[0]):
|
||||||
scheduler_jobs.pop(job.args[0].uuid)
|
jobs_to_remove.append(job.args[0].uuid)
|
||||||
|
for audit_uuid in jobs_to_remove:
|
||||||
|
scheduler_jobs.pop(audit_uuid)
|
||||||
|
audit_filters['hostname'] = CONF.host
|
||||||
|
audits = objects.Audit.list(
|
||||||
|
audit_context, filters=audit_filters, eager=True)
|
||||||
for audit in audits:
|
for audit in audits:
|
||||||
existing_job = scheduler_jobs.get(audit.uuid, None)
|
existing_job = scheduler_jobs.get(audit.uuid, None)
|
||||||
# if audit is not presented in scheduled audits yet,
|
# if audit is not presented in scheduled audits yet,
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ class TestCancelOngoingActionPlans(db_base.DbTestCase):
|
|||||||
self.context,
|
self.context,
|
||||||
action_plan_id=1,
|
action_plan_id=1,
|
||||||
state=objects.action.State.PENDING)
|
state=objects.action.State.PENDING)
|
||||||
cfg.CONF.set_override('host', 'hostname1')
|
cfg.CONF.set_override("host", "hostname1")
|
||||||
|
|
||||||
@mock.patch.object(objects.action.Action, 'save')
|
@mock.patch.object(objects.action.Action, 'save')
|
||||||
@mock.patch.object(objects.action_plan.ActionPlan, 'save')
|
@mock.patch.object(objects.action_plan.ActionPlan, 'save')
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
from oslo_config import cfg
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
from apscheduler import job
|
from apscheduler import job
|
||||||
@@ -242,8 +243,10 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
|||||||
audit_template_id=audit_template.id,
|
audit_template_id=audit_template.id,
|
||||||
goal_id=self.goal.id,
|
goal_id=self.goal.id,
|
||||||
audit_type=objects.audit.AuditType.CONTINUOUS.value,
|
audit_type=objects.audit.AuditType.CONTINUOUS.value,
|
||||||
goal=self.goal)
|
goal=self.goal,
|
||||||
|
hostname='hostname1')
|
||||||
for id_ in range(2, 4)]
|
for id_ in range(2, 4)]
|
||||||
|
cfg.CONF.set_override("host", "hostname1")
|
||||||
|
|
||||||
@mock.patch.object(objects.service.Service, 'list')
|
@mock.patch.object(objects.service.Service, 'list')
|
||||||
@mock.patch.object(sq_api, 'get_engine')
|
@mock.patch.object(sq_api, 'get_engine')
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ class TestCancelOngoingAudits(db_base.DbTestCase):
|
|||||||
goal=self.goal,
|
goal=self.goal,
|
||||||
hostname='hostname1',
|
hostname='hostname1',
|
||||||
state=objects.audit.State.ONGOING)
|
state=objects.audit.State.ONGOING)
|
||||||
cfg.CONF.set_override('host', 'hostname1')
|
cfg.CONF.set_override("host", "hostname1")
|
||||||
|
|
||||||
@mock.patch.object(objects.audit.Audit, 'save')
|
@mock.patch.object(objects.audit.Audit, 'save')
|
||||||
@mock.patch.object(objects.audit.Audit, 'list')
|
@mock.patch.object(objects.audit.Audit, 'list')
|
||||||
|
|||||||
Reference in New Issue
Block a user