Merge "Add Watcher JobStore for background jobs"
This commit is contained in:
@@ -112,16 +112,19 @@ class WSGIService(service.ServiceBase):
|
||||
|
||||
class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
|
||||
|
||||
service_name = None
|
||||
|
||||
def __init__(self, gconfig=None, service_name=None, **kwargs):
|
||||
gconfig = None or {}
|
||||
super(ServiceHeartbeat, self).__init__(gconfig, **kwargs)
|
||||
self.service_name = service_name
|
||||
ServiceHeartbeat.service_name = service_name
|
||||
self.context = context.make_context()
|
||||
self.send_beat()
|
||||
|
||||
def send_beat(self):
|
||||
host = CONF.host
|
||||
watcher_list = objects.Service.list(
|
||||
self.context, filters={'name': self.service_name,
|
||||
self.context, filters={'name': ServiceHeartbeat.service_name,
|
||||
'host': host})
|
||||
if watcher_list:
|
||||
watcher_service = watcher_list[0]
|
||||
@@ -129,7 +132,7 @@ class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
|
||||
watcher_service.save()
|
||||
else:
|
||||
watcher_service = objects.Service(self.context)
|
||||
watcher_service.name = self.service_name
|
||||
watcher_service.name = ServiceHeartbeat.service_name
|
||||
watcher_service.host = host
|
||||
watcher_service.create()
|
||||
|
||||
@@ -137,6 +140,10 @@ class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
|
||||
self.add_job(self.send_beat, 'interval', seconds=60,
|
||||
next_run_time=datetime.datetime.now())
|
||||
|
||||
@classmethod
|
||||
def get_service_name(cls):
|
||||
return CONF.host, cls.service_name
|
||||
|
||||
def start(self):
|
||||
"""Start service."""
|
||||
self.add_heartbeat_job()
|
||||
@@ -170,6 +177,13 @@ class Service(service.ServiceBase):
|
||||
self.conductor_topic = self.manager.conductor_topic
|
||||
self.notification_topics = self.manager.notification_topics
|
||||
|
||||
self.heartbeat = None
|
||||
|
||||
self.service_name = self.manager.service_name
|
||||
if self.service_name:
|
||||
self.heartbeat = ServiceHeartbeat(
|
||||
service_name=self.manager.service_name)
|
||||
|
||||
self.conductor_endpoints = [
|
||||
ep(self) for ep in self.manager.conductor_endpoints
|
||||
]
|
||||
@@ -185,8 +199,6 @@ class Service(service.ServiceBase):
|
||||
self.conductor_topic_handler = None
|
||||
self.notification_handler = None
|
||||
|
||||
self.heartbeat = None
|
||||
|
||||
if self.conductor_topic and self.conductor_endpoints:
|
||||
self.conductor_topic_handler = self.build_topic_handler(
|
||||
self.conductor_topic, self.conductor_endpoints)
|
||||
@@ -194,10 +206,6 @@ class Service(service.ServiceBase):
|
||||
self.notification_handler = self.build_notification_handler(
|
||||
self.notification_topics, self.notification_endpoints
|
||||
)
|
||||
self.service_name = self.manager.service_name
|
||||
if self.service_name:
|
||||
self.heartbeat = ServiceHeartbeat(
|
||||
service_name=self.manager.service_name)
|
||||
|
||||
@property
|
||||
def transport(self):
|
||||
|
||||
33
watcher/db/sqlalchemy/alembic/versions/0f6042416884_.py
Normal file
33
watcher/db/sqlalchemy/alembic/versions/0f6042416884_.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""Add apscheduler_jobs table to store background jobs
|
||||
|
||||
Revision ID: 0f6042416884
|
||||
Revises: 001
|
||||
Create Date: 2017-03-24 11:21:29.036532
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from watcher.db.sqlalchemy import models
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0f6042416884'
|
||||
down_revision = '001'
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'apscheduler_jobs',
|
||||
sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False),
|
||||
nullable=False),
|
||||
sa.Column('next_run_time', sa.Float(25), index=True),
|
||||
sa.Column('job_state', sa.LargeBinary, nullable=False),
|
||||
sa.Column('service_id', sa.Integer(), nullable=False),
|
||||
sa.Column('tag', models.JSONEncodedDict(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.ForeignKeyConstraint(['service_id'], ['services.id'])
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('apscheduler_jobs')
|
||||
112
watcher/db/sqlalchemy/job_store.py
Normal file
112
watcher/db/sqlalchemy/job_store.py
Normal file
@@ -0,0 +1,112 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
# Copyright (c) 2017 Servionica LTD
|
||||
#
|
||||
# Authors: Alexander Chadin <a.chadin@servionica.ru>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from apscheduler.jobstores.base import ConflictingIdError
|
||||
from apscheduler.jobstores import sqlalchemy
|
||||
from apscheduler.util import datetime_to_utc_timestamp
|
||||
from apscheduler.util import maybe_ref
|
||||
|
||||
from watcher.common import context
|
||||
from watcher.common import service
|
||||
from watcher import objects
|
||||
|
||||
try:
|
||||
import cPickle as pickle
|
||||
except ImportError: # pragma: nocover
|
||||
import pickle
|
||||
|
||||
from sqlalchemy import Table, MetaData, select, and_
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
|
||||
class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore):
|
||||
"""Stores jobs in a database table using SQLAlchemy.
|
||||
|
||||
The table will be created if it doesn't exist in the database.
|
||||
Plugin alias: ``sqlalchemy``
|
||||
:param str url: connection string
|
||||
:param engine: an SQLAlchemy Engine to use instead of creating a new
|
||||
one based on ``url``
|
||||
:param str tablename: name of the table to store jobs in
|
||||
:param metadata: a :class:`~sqlalchemy.MetaData` instance to use instead of
|
||||
creating a new one
|
||||
:param int pickle_protocol: pickle protocol level to use
|
||||
(for serialization), defaults to the highest available
|
||||
:param dict tag: tag description
|
||||
"""
|
||||
|
||||
def __init__(self, url=None, engine=None, tablename='apscheduler_jobs',
|
||||
metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL,
|
||||
tag=None):
|
||||
super(WatcherJobStore, self).__init__(url, engine, tablename,
|
||||
metadata, pickle_protocol)
|
||||
metadata = maybe_ref(metadata) or MetaData()
|
||||
self.jobs_t = Table(tablename, metadata, autoload=True,
|
||||
autoload_with=engine)
|
||||
service_ident = service.ServiceHeartbeat.get_service_name()
|
||||
self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]}
|
||||
self.service_id = objects.Service.list(context=context.make_context(),
|
||||
filters=self.tag)[0].id
|
||||
|
||||
def start(self, scheduler, alias):
|
||||
# There should be called 'start' method of parent of SQLAlchemyJobStore
|
||||
super(self.__class__.__bases__[0], self).start(scheduler, alias)
|
||||
|
||||
def add_job(self, job):
|
||||
insert = self.jobs_t.insert().values(**{
|
||||
'id': job.id,
|
||||
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
||||
'job_state': pickle.dumps(job.__getstate__(),
|
||||
self.pickle_protocol),
|
||||
'service_id': self.service_id,
|
||||
'tag': jsonutils.dumps(self.tag)
|
||||
})
|
||||
try:
|
||||
self.engine.execute(insert)
|
||||
except IntegrityError:
|
||||
raise ConflictingIdError(job.id)
|
||||
|
||||
def get_all_jobs(self):
|
||||
jobs = self._get_jobs(self.jobs_t.c.tag == jsonutils.dumps(self.tag))
|
||||
self._fix_paused_jobs_sorting(jobs)
|
||||
return jobs
|
||||
|
||||
def _get_jobs(self, *conditions):
|
||||
jobs = []
|
||||
conditions += (self.jobs_t.c.service_id == self.service_id,)
|
||||
selectable = select(
|
||||
[self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag]
|
||||
).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions))
|
||||
failed_job_ids = set()
|
||||
for row in self.engine.execute(selectable):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(row.job_state))
|
||||
except Exception:
|
||||
self._logger.exception(
|
||||
'Unable to restore job "%s" -- removing it', row.id)
|
||||
failed_job_ids.add(row.id)
|
||||
|
||||
# Remove all the jobs we failed to restore
|
||||
if failed_job_ids:
|
||||
delete = self.jobs_t.delete().where(
|
||||
self.jobs_t.c.id.in_(failed_job_ids))
|
||||
self.engine.execute(delete)
|
||||
|
||||
return jobs
|
||||
@@ -24,6 +24,7 @@ from oslo_log import log
|
||||
|
||||
from watcher.applier import rpcapi
|
||||
from watcher.common import exception
|
||||
from watcher.common import service
|
||||
from watcher.decision_engine.planner import manager as planner_manager
|
||||
from watcher.decision_engine.strategy.context import default as default_context
|
||||
from watcher import notifications
|
||||
@@ -34,6 +35,7 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
@six.add_metaclass(service.Singleton)
|
||||
class BaseAuditHandler(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -55,8 +57,9 @@ class BaseAuditHandler(object):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AuditHandler(BaseAuditHandler):
|
||||
def __init__(self, messaging):
|
||||
self._messaging = messaging
|
||||
|
||||
def __init__(self):
|
||||
super(AuditHandler, self).__init__()
|
||||
self._strategy_context = default_context.DefaultStrategyContext()
|
||||
self._planner_manager = planner_manager.PlannerManager()
|
||||
self._planner = None
|
||||
@@ -67,10 +70,6 @@ class AuditHandler(BaseAuditHandler):
|
||||
self._planner = self._planner_manager.load()
|
||||
return self._planner
|
||||
|
||||
@property
|
||||
def messaging(self):
|
||||
return self._messaging
|
||||
|
||||
@property
|
||||
def strategy_context(self):
|
||||
return self._strategy_context
|
||||
@@ -96,14 +95,12 @@ class AuditHandler(BaseAuditHandler):
|
||||
phase=fields.NotificationPhase.ERROR)
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def update_audit_state(audit, state):
|
||||
def update_audit_state(self, audit, state):
|
||||
LOG.debug("Update audit state: %s", state)
|
||||
audit.state = state
|
||||
audit.save()
|
||||
|
||||
@staticmethod
|
||||
def check_ongoing_action_plans(request_context):
|
||||
def check_ongoing_action_plans(self, request_context):
|
||||
a_plan_filters = {'state': objects.action_plan.State.ONGOING}
|
||||
ongoing_action_plans = objects.ActionPlan.list(
|
||||
request_context, filters=a_plan_filters)
|
||||
|
||||
@@ -20,30 +20,37 @@
|
||||
|
||||
import datetime
|
||||
|
||||
from apscheduler.schedulers import background
|
||||
from apscheduler.jobstores import memory
|
||||
|
||||
from watcher.common import context
|
||||
from watcher.common import scheduling
|
||||
from watcher import conf
|
||||
from watcher.db.sqlalchemy import api as sq_api
|
||||
from watcher.db.sqlalchemy import job_store
|
||||
from watcher.decision_engine.audit import base
|
||||
from watcher import objects
|
||||
|
||||
from watcher import conf
|
||||
|
||||
CONF = conf.CONF
|
||||
|
||||
|
||||
class ContinuousAuditHandler(base.AuditHandler):
|
||||
def __init__(self, messaging):
|
||||
super(ContinuousAuditHandler, self).__init__(messaging)
|
||||
def __init__(self):
|
||||
super(ContinuousAuditHandler, self).__init__()
|
||||
self._scheduler = None
|
||||
self.jobs = []
|
||||
self._start()
|
||||
self.context_show_deleted = context.RequestContext(is_admin=True,
|
||||
show_deleted=True)
|
||||
|
||||
@property
|
||||
def scheduler(self):
|
||||
if self._scheduler is None:
|
||||
self._scheduler = background.BackgroundScheduler()
|
||||
self._scheduler = scheduling.BackgroundSchedulerService(
|
||||
jobstores={
|
||||
'default': job_store.WatcherJobStore(
|
||||
engine=sq_api.get_engine()),
|
||||
'memory': memory.MemoryJobStore()
|
||||
}
|
||||
)
|
||||
return self._scheduler
|
||||
|
||||
def _is_audit_inactive(self, audit):
|
||||
@@ -52,11 +59,9 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
if objects.audit.AuditStateTransitionManager().is_inactive(audit):
|
||||
# if audit isn't in active states, audit's job must be removed to
|
||||
# prevent using of inactive audit in future.
|
||||
job_to_delete = [job for job in self.jobs
|
||||
if list(job.keys())[0] == audit.uuid][0]
|
||||
self.jobs.remove(job_to_delete)
|
||||
job_to_delete[audit.uuid].remove()
|
||||
|
||||
[job for job in self.scheduler.get_jobs()
|
||||
if job.name == 'execute_audit' and
|
||||
job.args[0].uuid == audit.uuid][0].remove()
|
||||
return True
|
||||
|
||||
return False
|
||||
@@ -76,7 +81,9 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
plan.save()
|
||||
return solution
|
||||
|
||||
def execute_audit(self, audit, request_context):
|
||||
@classmethod
|
||||
def execute_audit(cls, audit, request_context):
|
||||
self = cls()
|
||||
if not self._is_audit_inactive(audit):
|
||||
self.execute(audit, request_context)
|
||||
|
||||
@@ -90,22 +97,23 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
}
|
||||
audits = objects.Audit.list(
|
||||
audit_context, filters=audit_filters, eager=True)
|
||||
scheduler_job_args = [job.args for job in self.scheduler.get_jobs()
|
||||
if job.name == 'execute_audit']
|
||||
scheduler_job_args = [
|
||||
job.args for job in self.scheduler.get_jobs()
|
||||
if job.name == 'execute_audit']
|
||||
for audit in audits:
|
||||
if audit.uuid not in [arg[0].uuid for arg in scheduler_job_args]:
|
||||
job = self.scheduler.add_job(
|
||||
self.scheduler.add_job(
|
||||
self.execute_audit, 'interval',
|
||||
args=[audit, audit_context],
|
||||
seconds=audit.interval,
|
||||
name='execute_audit',
|
||||
next_run_time=datetime.datetime.now())
|
||||
self.jobs.append({audit.uuid: job})
|
||||
|
||||
def _start(self):
|
||||
def start(self):
|
||||
self.scheduler.add_job(
|
||||
self.launch_audits_periodically,
|
||||
'interval',
|
||||
seconds=CONF.watcher_decision_engine.continuous_audit_interval,
|
||||
next_run_time=datetime.datetime.now())
|
||||
next_run_time=datetime.datetime.now(),
|
||||
jobstore='memory')
|
||||
self.scheduler.start()
|
||||
|
||||
@@ -19,6 +19,7 @@ from watcher import objects
|
||||
|
||||
|
||||
class OneShotAuditHandler(base.AuditHandler):
|
||||
|
||||
def do_execute(self, audit, request_context):
|
||||
# execute the strategy
|
||||
solution = self.strategy_context.execute_strategy(
|
||||
|
||||
@@ -21,8 +21,9 @@ from concurrent import futures
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from watcher.decision_engine.audit import continuous as continuous_handler
|
||||
from watcher.decision_engine.audit import oneshot as oneshot_handler
|
||||
from watcher.decision_engine.audit import continuous as c_handler
|
||||
from watcher.decision_engine.audit import oneshot as o_handler
|
||||
|
||||
from watcher import objects
|
||||
|
||||
CONF = cfg.CONF
|
||||
@@ -35,19 +36,13 @@ class AuditEndpoint(object):
|
||||
self._messaging = messaging
|
||||
self._executor = futures.ThreadPoolExecutor(
|
||||
max_workers=CONF.watcher_decision_engine.max_workers)
|
||||
self._oneshot_handler = oneshot_handler.OneShotAuditHandler(
|
||||
self.messaging)
|
||||
self._continuous_handler = continuous_handler.ContinuousAuditHandler(
|
||||
self.messaging)
|
||||
self._oneshot_handler = o_handler.OneShotAuditHandler()
|
||||
self._continuous_handler = c_handler.ContinuousAuditHandler().start()
|
||||
|
||||
@property
|
||||
def executor(self):
|
||||
return self._executor
|
||||
|
||||
@property
|
||||
def messaging(self):
|
||||
return self._messaging
|
||||
|
||||
def do_trigger_audit(self, context, audit_uuid):
|
||||
audit = objects.Audit.get_by_uuid(context, audit_uuid, eager=True)
|
||||
self._oneshot_handler.execute(audit, context)
|
||||
|
||||
@@ -27,6 +27,10 @@ from watcher.tests import base
|
||||
class TestApplierManager(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestApplierManager, self).setUp()
|
||||
p_heartbeat = mock.patch.object(
|
||||
service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
self.applier = service.Service(applier_manager.ApplierManager)
|
||||
|
||||
@mock.patch.object(om.rpc.server.RPCServer, "stop")
|
||||
|
||||
@@ -22,6 +22,7 @@ import types
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
from watcher.common import service as watcher_service
|
||||
|
||||
from watcher.cmd import applier
|
||||
from watcher.tests import base
|
||||
@@ -39,6 +40,10 @@ class TestApplier(base.BaseTestCase):
|
||||
|
||||
_fake_parse_method = types.MethodType(_fake_parse, self.conf)
|
||||
self.conf._parse_cli_opts = _fake_parse_method
|
||||
p_heartbeat = mock.patch.object(
|
||||
watcher_service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestApplier, self).tearDown()
|
||||
|
||||
@@ -24,6 +24,8 @@ from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
|
||||
from watcher.cmd import decisionengine
|
||||
from watcher.common import service as watcher_service
|
||||
from watcher.decision_engine.audit import continuous
|
||||
from watcher.decision_engine import sync
|
||||
from watcher.tests import base
|
||||
|
||||
@@ -42,6 +44,15 @@ class TestDecisionEngine(base.BaseTestCase):
|
||||
_fake_parse_method = types.MethodType(_fake_parse, self.conf)
|
||||
self.conf._parse_cli_opts = _fake_parse_method
|
||||
|
||||
p_heartbeat = mock.patch.object(
|
||||
watcher_service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
p_continuoushandler = mock.patch.object(
|
||||
continuous.ContinuousAuditHandler, "start")
|
||||
self.m_continuoushandler = p_continuoushandler.start()
|
||||
self.addCleanup(p_continuoushandler.stop)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestDecisionEngine, self).tearDown()
|
||||
self.conf._parse_cli_opts = self._parse_cli_opts
|
||||
|
||||
@@ -53,10 +53,9 @@ class TestServiceHeartbeat(base.TestCase):
|
||||
def test_send_beat_with_creating_service(self, mock_create,
|
||||
mock_list):
|
||||
CONF.set_default('host', 'fake-fqdn')
|
||||
service_heartbeat = service.ServiceHeartbeat(
|
||||
service_name='watcher-service')
|
||||
|
||||
mock_list.return_value = []
|
||||
service_heartbeat.send_beat()
|
||||
service.ServiceHeartbeat(service_name='watcher-service')
|
||||
mock_list.assert_called_once_with(mock.ANY,
|
||||
filters={'name': 'watcher-service',
|
||||
'host': 'fake-fqdn'})
|
||||
@@ -65,12 +64,11 @@ class TestServiceHeartbeat(base.TestCase):
|
||||
@mock.patch.object(objects.Service, 'list')
|
||||
@mock.patch.object(objects.Service, 'save')
|
||||
def test_send_beat_without_creating_service(self, mock_save, mock_list):
|
||||
service_heartbeat = service.ServiceHeartbeat(
|
||||
service_name='watcher-service')
|
||||
|
||||
mock_list.return_value = [objects.Service(mock.Mock(),
|
||||
name='watcher-service',
|
||||
host='controller')]
|
||||
service_heartbeat.send_beat()
|
||||
service.ServiceHeartbeat(service_name='watcher-service')
|
||||
self.assertEqual(1, mock_save.call_count)
|
||||
|
||||
|
||||
|
||||
@@ -14,11 +14,14 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from apscheduler.schedulers import background
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from apscheduler import job
|
||||
|
||||
from watcher.applier import rpcapi
|
||||
from watcher.common import scheduling
|
||||
from watcher.db.sqlalchemy import api as sq_api
|
||||
from watcher.decision_engine.audit import continuous
|
||||
from watcher.decision_engine.audit import oneshot
|
||||
from watcher.decision_engine.model.collector import manager
|
||||
@@ -57,7 +60,7 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_without_errors(self, m_collector):
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
expected_calls = [
|
||||
@@ -83,7 +86,7 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
def test_trigger_audit_with_error(self, m_collector, m_do_execute):
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
m_do_execute.side_effect = Exception
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
expected_calls = [
|
||||
@@ -102,7 +105,7 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_state_succeeded(self, m_collector):
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
audit = objects.audit.Audit.get_by_uuid(self.context, self.audit.uuid)
|
||||
self.assertEqual(objects.audit.State.SUCCEEDED, audit.state)
|
||||
@@ -127,9 +130,8 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_send_notification(self, m_collector):
|
||||
messaging = mock.MagicMock()
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(messaging)
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
expected_calls = [
|
||||
@@ -194,7 +196,7 @@ class TestAutoTriggerActionPlan(base.DbTestCase):
|
||||
def test_trigger_audit_with_actionplan_ongoing(self, mock_list,
|
||||
mock_do_execute):
|
||||
mock_list.return_value = [self.ongoing_action_plan]
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
self.assertFalse(mock_do_execute.called)
|
||||
|
||||
@@ -205,9 +207,9 @@ class TestAutoTriggerActionPlan(base.DbTestCase):
|
||||
mock_list, mock_applier):
|
||||
mock_get_by_id.return_value = self.audit
|
||||
mock_list.return_value = []
|
||||
auto_trigger_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
with mock.patch.object(auto_trigger_handler, 'do_schedule',
|
||||
new_callable=mock.PropertyMock) as m_schedule:
|
||||
auto_trigger_handler = oneshot.OneShotAuditHandler()
|
||||
with mock.patch.object(auto_trigger_handler,
|
||||
'do_schedule') as m_schedule:
|
||||
m_schedule().uuid = self.recommended_action_plan.uuid
|
||||
auto_trigger_handler.post_execute(self.audit, mock.MagicMock(),
|
||||
self.context)
|
||||
@@ -234,30 +236,39 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
goal=self.goal)
|
||||
for id_ in range(2, 4)]
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
def test_launch_audits_periodically(self, mock_list, mock_jobs,
|
||||
m_add_job, m_collector):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
m_add_job, m_engine, m_service):
|
||||
audit_handler = continuous.ContinuousAuditHandler()
|
||||
mock_list.return_value = self.audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
m_engine.return_value = mock.MagicMock()
|
||||
m_add_job.return_value = audit_handler.execute_audit(
|
||||
self.audits[0], self.context)
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
|
||||
audit_handler.launch_audits_periodically()
|
||||
m_service.assert_called()
|
||||
m_engine.assert_called()
|
||||
m_add_job.assert_called()
|
||||
mock_jobs.assert_called()
|
||||
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
def test_launch_multiply_audits_periodically(self, mock_list,
|
||||
mock_jobs, m_add_job):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
mock_jobs, m_add_job,
|
||||
m_engine, m_service):
|
||||
audit_handler = continuous.ContinuousAuditHandler()
|
||||
mock_list.return_value = self.audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
m_engine.return_value = mock.MagicMock()
|
||||
m_service.return_value = mock.MagicMock()
|
||||
calls = [mock.call(audit_handler.execute_audit, 'interval',
|
||||
args=[mock.ANY, mock.ANY],
|
||||
seconds=3600,
|
||||
@@ -266,26 +277,39 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
audit_handler.launch_audits_periodically()
|
||||
m_add_job.assert_has_calls(calls)
|
||||
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
def test_period_audit_not_called_when_deleted(self, mock_list,
|
||||
mock_jobs, m_add_job):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
mock_jobs, m_add_job,
|
||||
m_engine, m_service):
|
||||
audit_handler = continuous.ContinuousAuditHandler()
|
||||
mock_list.return_value = self.audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
m_service.return_value = mock.MagicMock()
|
||||
m_engine.return_value = mock.MagicMock()
|
||||
self.audits[1].state = objects.audit.State.CANCELLED
|
||||
self.audits[0].state = objects.audit.State.SUSPENDED
|
||||
|
||||
for state in [objects.audit.State.CANCELLED,
|
||||
objects.audit.State.SUSPENDED]:
|
||||
self.audits[1].state = state
|
||||
calls = [mock.call(audit_handler.execute_audit, 'interval',
|
||||
args=[mock.ANY, mock.ANY],
|
||||
seconds=3600,
|
||||
name='execute_audit',
|
||||
next_run_time=mock.ANY)]
|
||||
audit_handler.launch_audits_periodically()
|
||||
m_add_job.assert_has_calls(calls)
|
||||
ap_jobs = [job.Job(mock.MagicMock(), name='execute_audit',
|
||||
func=audit_handler.execute_audit,
|
||||
args=(self.audits[0], mock.MagicMock()),
|
||||
kwargs={}),
|
||||
job.Job(mock.MagicMock(), name='execute_audit',
|
||||
func=audit_handler.execute_audit,
|
||||
args=(self.audits[1], mock.MagicMock()),
|
||||
kwargs={})
|
||||
]
|
||||
mock_jobs.return_value = ap_jobs
|
||||
audit_handler.launch_audits_periodically()
|
||||
|
||||
audit_handler.update_audit_state(self.audits[1], state)
|
||||
is_inactive = audit_handler._is_audit_inactive(self.audits[1])
|
||||
audit_handler.update_audit_state(self.audits[1],
|
||||
objects.audit.State.CANCELLED)
|
||||
audit_handler.update_audit_state(self.audits[0],
|
||||
objects.audit.State.SUSPENDED)
|
||||
is_inactive = audit_handler._is_audit_inactive(self.audits[1])
|
||||
self.assertTrue(is_inactive)
|
||||
is_inactive = audit_handler._is_audit_inactive(self.audits[0])
|
||||
self.assertTrue(is_inactive)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from watcher.decision_engine.audit import continuous as continuous_handler
|
||||
from watcher.decision_engine.audit import oneshot as oneshot_handler
|
||||
from watcher.decision_engine.messaging import audit_endpoint
|
||||
from watcher.decision_engine.model.collector import manager
|
||||
@@ -34,11 +35,12 @@ class TestAuditEndpoint(base.DbTestCase):
|
||||
self.context,
|
||||
audit_template_id=self.audit_template.id)
|
||||
|
||||
@mock.patch.object(continuous_handler.ContinuousAuditHandler, 'start')
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_do_trigger_audit(self, mock_collector):
|
||||
def test_do_trigger_audit(self, mock_collector, mock_handler):
|
||||
mock_collector.return_value = faker_cluster_state.FakerModelCollector()
|
||||
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler
|
||||
endpoint = audit_endpoint.AuditEndpoint(audit_handler)
|
||||
|
||||
with mock.patch.object(oneshot_handler.OneShotAuditHandler,
|
||||
@@ -48,11 +50,12 @@ class TestAuditEndpoint(base.DbTestCase):
|
||||
|
||||
self.assertEqual(mock_call.call_count, 1)
|
||||
|
||||
@mock.patch.object(continuous_handler.ContinuousAuditHandler, 'start')
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit(self, mock_collector):
|
||||
def test_trigger_audit(self, mock_collector, mock_handler):
|
||||
mock_collector.return_value = faker_cluster_state.FakerModelCollector()
|
||||
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler
|
||||
endpoint = audit_endpoint.AuditEndpoint(audit_handler)
|
||||
|
||||
with mock.patch.object(endpoint.executor, 'submit') as mock_call:
|
||||
|
||||
@@ -72,8 +72,9 @@ class TestReceiveNotifications(NotificationTestCase):
|
||||
m_from_dict.return_value = self.context
|
||||
self.addCleanup(p_from_dict.stop)
|
||||
|
||||
@mock.patch.object(watcher_service.ServiceHeartbeat, 'send_beat')
|
||||
@mock.patch.object(DummyNotification, 'info')
|
||||
def test_receive_dummy_notification(self, m_info):
|
||||
def test_receive_dummy_notification(self, m_info, m_heartbeat):
|
||||
message = {
|
||||
'publisher_id': 'nova-compute',
|
||||
'event_type': 'compute.dummy',
|
||||
@@ -90,8 +91,9 @@ class TestReceiveNotifications(NotificationTestCase):
|
||||
{'data': {'nested': 'TEST'}},
|
||||
{'message_id': None, 'timestamp': None})
|
||||
|
||||
@mock.patch.object(watcher_service.ServiceHeartbeat, 'send_beat')
|
||||
@mock.patch.object(DummyNotification, 'info')
|
||||
def test_skip_unwanted_notification(self, m_info):
|
||||
def test_skip_unwanted_notification(self, m_info, m_heartbeat):
|
||||
message = {
|
||||
'publisher_id': 'nova-compute',
|
||||
'event_type': 'compute.dummy',
|
||||
|
||||
@@ -56,6 +56,10 @@ class TestReceiveNovaNotifications(NotificationTestCase):
|
||||
m_from_dict = p_from_dict.start()
|
||||
m_from_dict.return_value = self.context
|
||||
self.addCleanup(p_from_dict.stop)
|
||||
p_heartbeat = mock.patch.object(
|
||||
watcher_service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
|
||||
@mock.patch.object(novanotification.ServiceUpdated, 'info')
|
||||
def test_nova_receive_service_update(self, m_info):
|
||||
|
||||
Reference in New Issue
Block a user