diff --git a/watcher/common/service.py b/watcher/common/service.py index 3c80edc1b..f1d74ab3f 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -110,16 +110,19 @@ class WSGIService(service.ServiceBase): class ServiceHeartbeat(scheduling.BackgroundSchedulerService): + service_name = None + def __init__(self, gconfig=None, service_name=None, **kwargs): gconfig = None or {} super(ServiceHeartbeat, self).__init__(gconfig, **kwargs) - self.service_name = service_name + ServiceHeartbeat.service_name = service_name self.context = context.make_context() + self.send_beat() def send_beat(self): host = CONF.host watcher_list = objects.Service.list( - self.context, filters={'name': self.service_name, + self.context, filters={'name': ServiceHeartbeat.service_name, 'host': host}) if watcher_list: watcher_service = watcher_list[0] @@ -127,7 +130,7 @@ class ServiceHeartbeat(scheduling.BackgroundSchedulerService): watcher_service.save() else: watcher_service = objects.Service(self.context) - watcher_service.name = self.service_name + watcher_service.name = ServiceHeartbeat.service_name watcher_service.host = host watcher_service.create() @@ -135,6 +138,10 @@ class ServiceHeartbeat(scheduling.BackgroundSchedulerService): self.add_job(self.send_beat, 'interval', seconds=60, next_run_time=datetime.datetime.now()) + @classmethod + def get_service_name(cls): + return CONF.host, cls.service_name + def start(self): """Start service.""" self.add_heartbeat_job() @@ -168,6 +175,13 @@ class Service(service.ServiceBase): self.conductor_topic = self.manager.conductor_topic self.notification_topics = self.manager.notification_topics + self.heartbeat = None + + self.service_name = self.manager.service_name + if self.service_name: + self.heartbeat = ServiceHeartbeat( + service_name=self.manager.service_name) + self.conductor_endpoints = [ ep(self) for ep in self.manager.conductor_endpoints ] @@ -183,8 +197,6 @@ class Service(service.ServiceBase): self.conductor_topic_handler = None self.notification_handler = None - self.heartbeat = None - if self.conductor_topic and self.conductor_endpoints: self.conductor_topic_handler = self.build_topic_handler( self.conductor_topic, self.conductor_endpoints) @@ -192,10 +204,6 @@ class Service(service.ServiceBase): self.notification_handler = self.build_notification_handler( self.notification_topics, self.notification_endpoints ) - self.service_name = self.manager.service_name - if self.service_name: - self.heartbeat = ServiceHeartbeat( - service_name=self.manager.service_name) @property def transport(self): diff --git a/watcher/db/sqlalchemy/alembic/versions/0f6042416884_.py b/watcher/db/sqlalchemy/alembic/versions/0f6042416884_.py new file mode 100644 index 000000000..56f4c8cd8 --- /dev/null +++ b/watcher/db/sqlalchemy/alembic/versions/0f6042416884_.py @@ -0,0 +1,33 @@ +"""Add apscheduler_jobs table to store background jobs + +Revision ID: 0f6042416884 +Revises: 001 +Create Date: 2017-03-24 11:21:29.036532 + +""" +from alembic import op +import sqlalchemy as sa + +from watcher.db.sqlalchemy import models + +# revision identifiers, used by Alembic. +revision = '0f6042416884' +down_revision = '001' + + +def upgrade(): + op.create_table( + 'apscheduler_jobs', + sa.Column('id', sa.Unicode(191, _warn_on_bytestring=False), + nullable=False), + sa.Column('next_run_time', sa.Float(25), index=True), + sa.Column('job_state', sa.LargeBinary, nullable=False), + sa.Column('service_id', sa.Integer(), nullable=False), + sa.Column('tag', models.JSONEncodedDict(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['service_id'], ['services.id']) + ) + + +def downgrade(): + op.drop_table('apscheduler_jobs') diff --git a/watcher/db/sqlalchemy/job_store.py b/watcher/db/sqlalchemy/job_store.py new file mode 100644 index 000000000..da5028f43 --- /dev/null +++ b/watcher/db/sqlalchemy/job_store.py @@ -0,0 +1,112 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 Servionica LTD +# +# Authors: Alexander Chadin +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_serialization import jsonutils + +from apscheduler.jobstores.base import ConflictingIdError +from apscheduler.jobstores import sqlalchemy +from apscheduler.util import datetime_to_utc_timestamp +from apscheduler.util import maybe_ref + +from watcher.common import context +from watcher.common import service +from watcher import objects + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +from sqlalchemy import Table, MetaData, select, and_ +from sqlalchemy.exc import IntegrityError + + +class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): + """Stores jobs in a database table using SQLAlchemy. + + The table will be created if it doesn't exist in the database. + Plugin alias: ``sqlalchemy`` + :param str url: connection string + :param engine: an SQLAlchemy Engine to use instead of creating a new + one based on ``url`` + :param str tablename: name of the table to store jobs in + :param metadata: a :class:`~sqlalchemy.MetaData` instance to use instead of + creating a new one + :param int pickle_protocol: pickle protocol level to use + (for serialization), defaults to the highest available + :param dict tag: tag description + """ + + def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', + metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL, + tag=None): + super(WatcherJobStore, self).__init__(url, engine, tablename, + metadata, pickle_protocol) + metadata = maybe_ref(metadata) or MetaData() + self.jobs_t = Table(tablename, metadata, autoload=True, + autoload_with=engine) + service_ident = service.ServiceHeartbeat.get_service_name() + self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]} + self.service_id = objects.Service.list(context=context.make_context(), + filters=self.tag)[0].id + + def start(self, scheduler, alias): + # There should be called 'start' method of parent of SQLAlchemyJobStore + super(self.__class__.__bases__[0], self).start(scheduler, alias) + + def add_job(self, job): + insert = self.jobs_t.insert().values(**{ + 'id': job.id, + 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), + 'job_state': pickle.dumps(job.__getstate__(), + self.pickle_protocol), + 'service_id': self.service_id, + 'tag': jsonutils.dumps(self.tag) + }) + try: + self.engine.execute(insert) + except IntegrityError: + raise ConflictingIdError(job.id) + + def get_all_jobs(self): + jobs = self._get_jobs(self.jobs_t.c.tag == jsonutils.dumps(self.tag)) + self._fix_paused_jobs_sorting(jobs) + return jobs + + def _get_jobs(self, *conditions): + jobs = [] + conditions += (self.jobs_t.c.service_id == self.service_id,) + selectable = select( + [self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag] + ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions)) + failed_job_ids = set() + for row in self.engine.execute(selectable): + try: + jobs.append(self._reconstitute_job(row.job_state)) + except Exception: + self._logger.exception( + 'Unable to restore job "%s" -- removing it', row.id) + failed_job_ids.add(row.id) + + # Remove all the jobs we failed to restore + if failed_job_ids: + delete = self.jobs_t.delete().where( + self.jobs_t.c.id.in_(failed_job_ids)) + self.engine.execute(delete) + + return jobs diff --git a/watcher/decision_engine/audit/base.py b/watcher/decision_engine/audit/base.py index 092374b0a..cccb1aaef 100644 --- a/watcher/decision_engine/audit/base.py +++ b/watcher/decision_engine/audit/base.py @@ -24,6 +24,7 @@ from oslo_log import log from watcher.applier import rpcapi from watcher.common import exception +from watcher.common import service from watcher.decision_engine.planner import manager as planner_manager from watcher.decision_engine.strategy.context import default as default_context from watcher import notifications @@ -34,6 +35,7 @@ LOG = log.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) +@six.add_metaclass(service.Singleton) class BaseAuditHandler(object): @abc.abstractmethod @@ -55,8 +57,9 @@ class BaseAuditHandler(object): @six.add_metaclass(abc.ABCMeta) class AuditHandler(BaseAuditHandler): - def __init__(self, messaging): - self._messaging = messaging + + def __init__(self): + super(AuditHandler, self).__init__() self._strategy_context = default_context.DefaultStrategyContext() self._planner_manager = planner_manager.PlannerManager() self._planner = None @@ -67,10 +70,6 @@ class AuditHandler(BaseAuditHandler): self._planner = self._planner_manager.load() return self._planner - @property - def messaging(self): - return self._messaging - @property def strategy_context(self): return self._strategy_context @@ -96,14 +95,12 @@ class AuditHandler(BaseAuditHandler): phase=fields.NotificationPhase.ERROR) raise - @staticmethod - def update_audit_state(audit, state): + def update_audit_state(self, audit, state): LOG.debug("Update audit state: %s", state) audit.state = state audit.save() - @staticmethod - def check_ongoing_action_plans(request_context): + def check_ongoing_action_plans(self, request_context): a_plan_filters = {'state': objects.action_plan.State.ONGOING} ongoing_action_plans = objects.ActionPlan.list( request_context, filters=a_plan_filters) diff --git a/watcher/decision_engine/audit/continuous.py b/watcher/decision_engine/audit/continuous.py index 91fb530c0..966d40efc 100644 --- a/watcher/decision_engine/audit/continuous.py +++ b/watcher/decision_engine/audit/continuous.py @@ -20,30 +20,37 @@ import datetime -from apscheduler.schedulers import background +from apscheduler.jobstores import memory from watcher.common import context +from watcher.common import scheduling +from watcher import conf +from watcher.db.sqlalchemy import api as sq_api +from watcher.db.sqlalchemy import job_store from watcher.decision_engine.audit import base from watcher import objects -from watcher import conf CONF = conf.CONF class ContinuousAuditHandler(base.AuditHandler): - def __init__(self, messaging): - super(ContinuousAuditHandler, self).__init__(messaging) + def __init__(self): + super(ContinuousAuditHandler, self).__init__() self._scheduler = None - self.jobs = [] - self._start() self.context_show_deleted = context.RequestContext(is_admin=True, show_deleted=True) @property def scheduler(self): if self._scheduler is None: - self._scheduler = background.BackgroundScheduler() + self._scheduler = scheduling.BackgroundSchedulerService( + jobstores={ + 'default': job_store.WatcherJobStore( + engine=sq_api.get_engine()), + 'memory': memory.MemoryJobStore() + } + ) return self._scheduler def _is_audit_inactive(self, audit): @@ -52,11 +59,9 @@ class ContinuousAuditHandler(base.AuditHandler): if objects.audit.AuditStateTransitionManager().is_inactive(audit): # if audit isn't in active states, audit's job must be removed to # prevent using of inactive audit in future. - job_to_delete = [job for job in self.jobs - if list(job.keys())[0] == audit.uuid][0] - self.jobs.remove(job_to_delete) - job_to_delete[audit.uuid].remove() - + [job for job in self.scheduler.get_jobs() + if job.name == 'execute_audit' and + job.args[0].uuid == audit.uuid][0].remove() return True return False @@ -76,7 +81,9 @@ class ContinuousAuditHandler(base.AuditHandler): plan.save() return solution - def execute_audit(self, audit, request_context): + @classmethod + def execute_audit(cls, audit, request_context): + self = cls() if not self._is_audit_inactive(audit): self.execute(audit, request_context) @@ -90,22 +97,23 @@ class ContinuousAuditHandler(base.AuditHandler): } audits = objects.Audit.list( audit_context, filters=audit_filters, eager=True) - scheduler_job_args = [job.args for job in self.scheduler.get_jobs() - if job.name == 'execute_audit'] + scheduler_job_args = [ + job.args for job in self.scheduler.get_jobs() + if job.name == 'execute_audit'] for audit in audits: if audit.uuid not in [arg[0].uuid for arg in scheduler_job_args]: - job = self.scheduler.add_job( + self.scheduler.add_job( self.execute_audit, 'interval', args=[audit, audit_context], seconds=audit.interval, name='execute_audit', next_run_time=datetime.datetime.now()) - self.jobs.append({audit.uuid: job}) - def _start(self): + def start(self): self.scheduler.add_job( self.launch_audits_periodically, 'interval', seconds=CONF.watcher_decision_engine.continuous_audit_interval, - next_run_time=datetime.datetime.now()) + next_run_time=datetime.datetime.now(), + jobstore='memory') self.scheduler.start() diff --git a/watcher/decision_engine/audit/oneshot.py b/watcher/decision_engine/audit/oneshot.py index a5ac2d7eb..fae2512d4 100644 --- a/watcher/decision_engine/audit/oneshot.py +++ b/watcher/decision_engine/audit/oneshot.py @@ -19,6 +19,7 @@ from watcher import objects class OneShotAuditHandler(base.AuditHandler): + def do_execute(self, audit, request_context): # execute the strategy solution = self.strategy_context.execute_strategy( diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index ec0934d3e..54d47aed8 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -21,8 +21,9 @@ from concurrent import futures from oslo_config import cfg from oslo_log import log -from watcher.decision_engine.audit import continuous as continuous_handler -from watcher.decision_engine.audit import oneshot as oneshot_handler +from watcher.decision_engine.audit import continuous as c_handler +from watcher.decision_engine.audit import oneshot as o_handler + from watcher import objects CONF = cfg.CONF @@ -35,19 +36,13 @@ class AuditEndpoint(object): self._messaging = messaging self._executor = futures.ThreadPoolExecutor( max_workers=CONF.watcher_decision_engine.max_workers) - self._oneshot_handler = oneshot_handler.OneShotAuditHandler( - self.messaging) - self._continuous_handler = continuous_handler.ContinuousAuditHandler( - self.messaging) + self._oneshot_handler = o_handler.OneShotAuditHandler() + self._continuous_handler = c_handler.ContinuousAuditHandler().start() @property def executor(self): return self._executor - @property - def messaging(self): - return self._messaging - def do_trigger_audit(self, context, audit_uuid): audit = objects.Audit.get_by_uuid(context, audit_uuid, eager=True) self._oneshot_handler.execute(audit, context) diff --git a/watcher/tests/applier/test_applier_manager.py b/watcher/tests/applier/test_applier_manager.py index ccb79bb96..bfa67504e 100644 --- a/watcher/tests/applier/test_applier_manager.py +++ b/watcher/tests/applier/test_applier_manager.py @@ -27,6 +27,10 @@ from watcher.tests import base class TestApplierManager(base.TestCase): def setUp(self): super(TestApplierManager, self).setUp() + p_heartbeat = mock.patch.object( + service.ServiceHeartbeat, "send_beat") + self.m_heartbeat = p_heartbeat.start() + self.addCleanup(p_heartbeat.stop) self.applier = service.Service(applier_manager.ApplierManager) @mock.patch.object(om.rpc.server.RPCServer, "stop") diff --git a/watcher/tests/cmd/test_applier.py b/watcher/tests/cmd/test_applier.py index e556eed5e..25690ebfa 100644 --- a/watcher/tests/cmd/test_applier.py +++ b/watcher/tests/cmd/test_applier.py @@ -22,6 +22,7 @@ import types import mock from oslo_config import cfg from oslo_service import service +from watcher.common import service as watcher_service from watcher.cmd import applier from watcher.tests import base @@ -39,6 +40,10 @@ class TestApplier(base.BaseTestCase): _fake_parse_method = types.MethodType(_fake_parse, self.conf) self.conf._parse_cli_opts = _fake_parse_method + p_heartbeat = mock.patch.object( + watcher_service.ServiceHeartbeat, "send_beat") + self.m_heartbeat = p_heartbeat.start() + self.addCleanup(p_heartbeat.stop) def tearDown(self): super(TestApplier, self).tearDown() diff --git a/watcher/tests/cmd/test_decision_engine.py b/watcher/tests/cmd/test_decision_engine.py index 0f962351c..3f0380ba2 100644 --- a/watcher/tests/cmd/test_decision_engine.py +++ b/watcher/tests/cmd/test_decision_engine.py @@ -24,6 +24,8 @@ from oslo_config import cfg from oslo_service import service from watcher.cmd import decisionengine +from watcher.common import service as watcher_service +from watcher.decision_engine.audit import continuous from watcher.decision_engine import sync from watcher.tests import base @@ -42,6 +44,15 @@ class TestDecisionEngine(base.BaseTestCase): _fake_parse_method = types.MethodType(_fake_parse, self.conf) self.conf._parse_cli_opts = _fake_parse_method + p_heartbeat = mock.patch.object( + watcher_service.ServiceHeartbeat, "send_beat") + self.m_heartbeat = p_heartbeat.start() + self.addCleanup(p_heartbeat.stop) + p_continuoushandler = mock.patch.object( + continuous.ContinuousAuditHandler, "start") + self.m_continuoushandler = p_continuoushandler.start() + self.addCleanup(p_continuoushandler.stop) + def tearDown(self): super(TestDecisionEngine, self).tearDown() self.conf._parse_cli_opts = self._parse_cli_opts diff --git a/watcher/tests/common/test_service.py b/watcher/tests/common/test_service.py index e9494b0d3..71f4f37ee 100644 --- a/watcher/tests/common/test_service.py +++ b/watcher/tests/common/test_service.py @@ -53,10 +53,9 @@ class TestServiceHeartbeat(base.TestCase): def test_send_beat_with_creating_service(self, mock_create, mock_list): CONF.set_default('host', 'fake-fqdn') - service_heartbeat = service.ServiceHeartbeat( - service_name='watcher-service') + mock_list.return_value = [] - service_heartbeat.send_beat() + service.ServiceHeartbeat(service_name='watcher-service') mock_list.assert_called_once_with(mock.ANY, filters={'name': 'watcher-service', 'host': 'fake-fqdn'}) @@ -65,12 +64,11 @@ class TestServiceHeartbeat(base.TestCase): @mock.patch.object(objects.Service, 'list') @mock.patch.object(objects.Service, 'save') def test_send_beat_without_creating_service(self, mock_save, mock_list): - service_heartbeat = service.ServiceHeartbeat( - service_name='watcher-service') + mock_list.return_value = [objects.Service(mock.Mock(), name='watcher-service', host='controller')] - service_heartbeat.send_beat() + service.ServiceHeartbeat(service_name='watcher-service') self.assertEqual(1, mock_save.call_count) diff --git a/watcher/tests/decision_engine/audit/test_audit_handlers.py b/watcher/tests/decision_engine/audit/test_audit_handlers.py index 42079c5f3..f992aa268 100644 --- a/watcher/tests/decision_engine/audit/test_audit_handlers.py +++ b/watcher/tests/decision_engine/audit/test_audit_handlers.py @@ -14,11 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from apscheduler.schedulers import background import mock from oslo_utils import uuidutils +from apscheduler import job + from watcher.applier import rpcapi +from watcher.common import scheduling +from watcher.db.sqlalchemy import api as sq_api from watcher.decision_engine.audit import continuous from watcher.decision_engine.audit import oneshot from watcher.decision_engine.model.collector import manager @@ -57,7 +60,7 @@ class TestOneShotAuditHandler(base.DbTestCase): @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") def test_trigger_audit_without_errors(self, m_collector): m_collector.return_value = faker.FakerModelCollector() - audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock()) + audit_handler = oneshot.OneShotAuditHandler() audit_handler.execute(self.audit, self.context) expected_calls = [ @@ -83,7 +86,7 @@ class TestOneShotAuditHandler(base.DbTestCase): def test_trigger_audit_with_error(self, m_collector, m_do_execute): m_collector.return_value = faker.FakerModelCollector() m_do_execute.side_effect = Exception - audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock()) + audit_handler = oneshot.OneShotAuditHandler() audit_handler.execute(self.audit, self.context) expected_calls = [ @@ -102,7 +105,7 @@ class TestOneShotAuditHandler(base.DbTestCase): @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") def test_trigger_audit_state_succeeded(self, m_collector): m_collector.return_value = faker.FakerModelCollector() - audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock()) + audit_handler = oneshot.OneShotAuditHandler() audit_handler.execute(self.audit, self.context) audit = objects.audit.Audit.get_by_uuid(self.context, self.audit.uuid) self.assertEqual(objects.audit.State.SUCCEEDED, audit.state) @@ -127,9 +130,8 @@ class TestOneShotAuditHandler(base.DbTestCase): @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") def test_trigger_audit_send_notification(self, m_collector): - messaging = mock.MagicMock() m_collector.return_value = faker.FakerModelCollector() - audit_handler = oneshot.OneShotAuditHandler(messaging) + audit_handler = oneshot.OneShotAuditHandler() audit_handler.execute(self.audit, self.context) expected_calls = [ @@ -194,7 +196,7 @@ class TestAutoTriggerActionPlan(base.DbTestCase): def test_trigger_audit_with_actionplan_ongoing(self, mock_list, mock_do_execute): mock_list.return_value = [self.ongoing_action_plan] - audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock()) + audit_handler = oneshot.OneShotAuditHandler() audit_handler.execute(self.audit, self.context) self.assertFalse(mock_do_execute.called) @@ -205,9 +207,9 @@ class TestAutoTriggerActionPlan(base.DbTestCase): mock_list, mock_applier): mock_get_by_id.return_value = self.audit mock_list.return_value = [] - auto_trigger_handler = oneshot.OneShotAuditHandler(mock.MagicMock()) - with mock.patch.object(auto_trigger_handler, 'do_schedule', - new_callable=mock.PropertyMock) as m_schedule: + auto_trigger_handler = oneshot.OneShotAuditHandler() + with mock.patch.object(auto_trigger_handler, + 'do_schedule') as m_schedule: m_schedule().uuid = self.recommended_action_plan.uuid auto_trigger_handler.post_execute(self.audit, mock.MagicMock(), self.context) @@ -234,30 +236,39 @@ class TestContinuousAuditHandler(base.DbTestCase): goal=self.goal) for id_ in range(2, 4)] - @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") - @mock.patch.object(background.BackgroundScheduler, 'add_job') - @mock.patch.object(background.BackgroundScheduler, 'get_jobs') + @mock.patch.object(objects.service.Service, 'list') + @mock.patch.object(sq_api, 'get_engine') + @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') + @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(objects.audit.Audit, 'list') def test_launch_audits_periodically(self, mock_list, mock_jobs, - m_add_job, m_collector): - audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock()) + m_add_job, m_engine, m_service): + audit_handler = continuous.ContinuousAuditHandler() mock_list.return_value = self.audits mock_jobs.return_value = mock.MagicMock() + m_engine.return_value = mock.MagicMock() m_add_job.return_value = audit_handler.execute_audit( self.audits[0], self.context) - m_collector.return_value = faker.FakerModelCollector() audit_handler.launch_audits_periodically() + m_service.assert_called() + m_engine.assert_called() m_add_job.assert_called() + mock_jobs.assert_called() - @mock.patch.object(background.BackgroundScheduler, 'add_job') - @mock.patch.object(background.BackgroundScheduler, 'get_jobs') + @mock.patch.object(objects.service.Service, 'list') + @mock.patch.object(sq_api, 'get_engine') + @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') + @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(objects.audit.Audit, 'list') def test_launch_multiply_audits_periodically(self, mock_list, - mock_jobs, m_add_job): - audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock()) + mock_jobs, m_add_job, + m_engine, m_service): + audit_handler = continuous.ContinuousAuditHandler() mock_list.return_value = self.audits mock_jobs.return_value = mock.MagicMock() + m_engine.return_value = mock.MagicMock() + m_service.return_value = mock.MagicMock() calls = [mock.call(audit_handler.execute_audit, 'interval', args=[mock.ANY, mock.ANY], seconds=3600, @@ -266,26 +277,39 @@ class TestContinuousAuditHandler(base.DbTestCase): audit_handler.launch_audits_periodically() m_add_job.assert_has_calls(calls) - @mock.patch.object(background.BackgroundScheduler, 'add_job') - @mock.patch.object(background.BackgroundScheduler, 'get_jobs') + @mock.patch.object(objects.service.Service, 'list') + @mock.patch.object(sq_api, 'get_engine') + @mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job') + @mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs') @mock.patch.object(objects.audit.Audit, 'list') def test_period_audit_not_called_when_deleted(self, mock_list, - mock_jobs, m_add_job): - audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock()) + mock_jobs, m_add_job, + m_engine, m_service): + audit_handler = continuous.ContinuousAuditHandler() mock_list.return_value = self.audits mock_jobs.return_value = mock.MagicMock() + m_service.return_value = mock.MagicMock() + m_engine.return_value = mock.MagicMock() + self.audits[1].state = objects.audit.State.CANCELLED + self.audits[0].state = objects.audit.State.SUSPENDED - for state in [objects.audit.State.CANCELLED, - objects.audit.State.SUSPENDED]: - self.audits[1].state = state - calls = [mock.call(audit_handler.execute_audit, 'interval', - args=[mock.ANY, mock.ANY], - seconds=3600, - name='execute_audit', - next_run_time=mock.ANY)] - audit_handler.launch_audits_periodically() - m_add_job.assert_has_calls(calls) + ap_jobs = [job.Job(mock.MagicMock(), name='execute_audit', + func=audit_handler.execute_audit, + args=(self.audits[0], mock.MagicMock()), + kwargs={}), + job.Job(mock.MagicMock(), name='execute_audit', + func=audit_handler.execute_audit, + args=(self.audits[1], mock.MagicMock()), + kwargs={}) + ] + mock_jobs.return_value = ap_jobs + audit_handler.launch_audits_periodically() - audit_handler.update_audit_state(self.audits[1], state) - is_inactive = audit_handler._is_audit_inactive(self.audits[1]) + audit_handler.update_audit_state(self.audits[1], + objects.audit.State.CANCELLED) + audit_handler.update_audit_state(self.audits[0], + objects.audit.State.SUSPENDED) + is_inactive = audit_handler._is_audit_inactive(self.audits[1]) + self.assertTrue(is_inactive) + is_inactive = audit_handler._is_audit_inactive(self.audits[0]) self.assertTrue(is_inactive) diff --git a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py index 9585382cb..2a72cbad6 100644 --- a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py +++ b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py @@ -16,6 +16,7 @@ import mock +from watcher.decision_engine.audit import continuous as continuous_handler from watcher.decision_engine.audit import oneshot as oneshot_handler from watcher.decision_engine.messaging import audit_endpoint from watcher.decision_engine.model.collector import manager @@ -34,11 +35,12 @@ class TestAuditEndpoint(base.DbTestCase): self.context, audit_template_id=self.audit_template.id) + @mock.patch.object(continuous_handler.ContinuousAuditHandler, 'start') @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") - def test_do_trigger_audit(self, mock_collector): + def test_do_trigger_audit(self, mock_collector, mock_handler): mock_collector.return_value = faker_cluster_state.FakerModelCollector() - audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock()) + audit_handler = oneshot_handler.OneShotAuditHandler endpoint = audit_endpoint.AuditEndpoint(audit_handler) with mock.patch.object(oneshot_handler.OneShotAuditHandler, @@ -48,11 +50,12 @@ class TestAuditEndpoint(base.DbTestCase): self.assertEqual(mock_call.call_count, 1) + @mock.patch.object(continuous_handler.ContinuousAuditHandler, 'start') @mock.patch.object(manager.CollectorManager, "get_cluster_model_collector") - def test_trigger_audit(self, mock_collector): + def test_trigger_audit(self, mock_collector, mock_handler): mock_collector.return_value = faker_cluster_state.FakerModelCollector() - audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock()) + audit_handler = oneshot_handler.OneShotAuditHandler endpoint = audit_endpoint.AuditEndpoint(audit_handler) with mock.patch.object(endpoint.executor, 'submit') as mock_call: diff --git a/watcher/tests/decision_engine/model/notification/test_notifications.py b/watcher/tests/decision_engine/model/notification/test_notifications.py index 9dc17cccd..9e2e6fab8 100644 --- a/watcher/tests/decision_engine/model/notification/test_notifications.py +++ b/watcher/tests/decision_engine/model/notification/test_notifications.py @@ -72,8 +72,9 @@ class TestReceiveNotifications(NotificationTestCase): m_from_dict.return_value = self.context self.addCleanup(p_from_dict.stop) + @mock.patch.object(watcher_service.ServiceHeartbeat, 'send_beat') @mock.patch.object(DummyNotification, 'info') - def test_receive_dummy_notification(self, m_info): + def test_receive_dummy_notification(self, m_info, m_heartbeat): message = { 'publisher_id': 'nova-compute', 'event_type': 'compute.dummy', @@ -90,8 +91,9 @@ class TestReceiveNotifications(NotificationTestCase): {'data': {'nested': 'TEST'}}, {'message_id': None, 'timestamp': None}) + @mock.patch.object(watcher_service.ServiceHeartbeat, 'send_beat') @mock.patch.object(DummyNotification, 'info') - def test_skip_unwanted_notification(self, m_info): + def test_skip_unwanted_notification(self, m_info, m_heartbeat): message = { 'publisher_id': 'nova-compute', 'event_type': 'compute.dummy', diff --git a/watcher/tests/decision_engine/model/notification/test_nova_notifications.py b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py index a8f283dff..f257dc793 100644 --- a/watcher/tests/decision_engine/model/notification/test_nova_notifications.py +++ b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py @@ -56,6 +56,10 @@ class TestReceiveNovaNotifications(NotificationTestCase): m_from_dict = p_from_dict.start() m_from_dict.return_value = self.context self.addCleanup(p_from_dict.stop) + p_heartbeat = mock.patch.object( + watcher_service.ServiceHeartbeat, "send_beat") + self.m_heartbeat = p_heartbeat.start() + self.addCleanup(p_heartbeat.stop) @mock.patch.object(novanotification.ServiceUpdated, 'info') def test_nova_receive_service_update(self, m_info):