Add Watcher JobStore for background jobs
This patch set adds WatcherJobStore class that allows to link jobs and services. Partially-Implements: blueprint background-jobs-ha Change-Id: I575887ca6dae60b3b7709e6d2e2b256e09a3d824
This commit is contained in:
@@ -27,6 +27,10 @@ from watcher.tests import base
|
||||
class TestApplierManager(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestApplierManager, self).setUp()
|
||||
p_heartbeat = mock.patch.object(
|
||||
service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
self.applier = service.Service(applier_manager.ApplierManager)
|
||||
|
||||
@mock.patch.object(om.rpc.server.RPCServer, "stop")
|
||||
|
||||
@@ -22,6 +22,7 @@ import types
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
from watcher.common import service as watcher_service
|
||||
|
||||
from watcher.cmd import applier
|
||||
from watcher.tests import base
|
||||
@@ -39,6 +40,10 @@ class TestApplier(base.BaseTestCase):
|
||||
|
||||
_fake_parse_method = types.MethodType(_fake_parse, self.conf)
|
||||
self.conf._parse_cli_opts = _fake_parse_method
|
||||
p_heartbeat = mock.patch.object(
|
||||
watcher_service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestApplier, self).tearDown()
|
||||
|
||||
@@ -24,6 +24,8 @@ from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
|
||||
from watcher.cmd import decisionengine
|
||||
from watcher.common import service as watcher_service
|
||||
from watcher.decision_engine.audit import continuous
|
||||
from watcher.decision_engine import sync
|
||||
from watcher.tests import base
|
||||
|
||||
@@ -42,6 +44,15 @@ class TestDecisionEngine(base.BaseTestCase):
|
||||
_fake_parse_method = types.MethodType(_fake_parse, self.conf)
|
||||
self.conf._parse_cli_opts = _fake_parse_method
|
||||
|
||||
p_heartbeat = mock.patch.object(
|
||||
watcher_service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
p_continuoushandler = mock.patch.object(
|
||||
continuous.ContinuousAuditHandler, "start")
|
||||
self.m_continuoushandler = p_continuoushandler.start()
|
||||
self.addCleanup(p_continuoushandler.stop)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestDecisionEngine, self).tearDown()
|
||||
self.conf._parse_cli_opts = self._parse_cli_opts
|
||||
|
||||
@@ -53,10 +53,9 @@ class TestServiceHeartbeat(base.TestCase):
|
||||
def test_send_beat_with_creating_service(self, mock_create,
|
||||
mock_list):
|
||||
CONF.set_default('host', 'fake-fqdn')
|
||||
service_heartbeat = service.ServiceHeartbeat(
|
||||
service_name='watcher-service')
|
||||
|
||||
mock_list.return_value = []
|
||||
service_heartbeat.send_beat()
|
||||
service.ServiceHeartbeat(service_name='watcher-service')
|
||||
mock_list.assert_called_once_with(mock.ANY,
|
||||
filters={'name': 'watcher-service',
|
||||
'host': 'fake-fqdn'})
|
||||
@@ -65,12 +64,11 @@ class TestServiceHeartbeat(base.TestCase):
|
||||
@mock.patch.object(objects.Service, 'list')
|
||||
@mock.patch.object(objects.Service, 'save')
|
||||
def test_send_beat_without_creating_service(self, mock_save, mock_list):
|
||||
service_heartbeat = service.ServiceHeartbeat(
|
||||
service_name='watcher-service')
|
||||
|
||||
mock_list.return_value = [objects.Service(mock.Mock(),
|
||||
name='watcher-service',
|
||||
host='controller')]
|
||||
service_heartbeat.send_beat()
|
||||
service.ServiceHeartbeat(service_name='watcher-service')
|
||||
self.assertEqual(1, mock_save.call_count)
|
||||
|
||||
|
||||
|
||||
@@ -14,11 +14,14 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from apscheduler.schedulers import background
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from apscheduler import job
|
||||
|
||||
from watcher.applier import rpcapi
|
||||
from watcher.common import scheduling
|
||||
from watcher.db.sqlalchemy import api as sq_api
|
||||
from watcher.decision_engine.audit import continuous
|
||||
from watcher.decision_engine.audit import oneshot
|
||||
from watcher.decision_engine.model.collector import manager
|
||||
@@ -57,7 +60,7 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_without_errors(self, m_collector):
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
expected_calls = [
|
||||
@@ -83,7 +86,7 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
def test_trigger_audit_with_error(self, m_collector, m_do_execute):
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
m_do_execute.side_effect = Exception
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
expected_calls = [
|
||||
@@ -102,7 +105,7 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_state_succeeded(self, m_collector):
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
audit = objects.audit.Audit.get_by_uuid(self.context, self.audit.uuid)
|
||||
self.assertEqual(objects.audit.State.SUCCEEDED, audit.state)
|
||||
@@ -127,9 +130,8 @@ class TestOneShotAuditHandler(base.DbTestCase):
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit_send_notification(self, m_collector):
|
||||
messaging = mock.MagicMock()
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
audit_handler = oneshot.OneShotAuditHandler(messaging)
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
|
||||
expected_calls = [
|
||||
@@ -194,7 +196,7 @@ class TestAutoTriggerActionPlan(base.DbTestCase):
|
||||
def test_trigger_audit_with_actionplan_ongoing(self, mock_list,
|
||||
mock_do_execute):
|
||||
mock_list.return_value = [self.ongoing_action_plan]
|
||||
audit_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot.OneShotAuditHandler()
|
||||
audit_handler.execute(self.audit, self.context)
|
||||
self.assertFalse(mock_do_execute.called)
|
||||
|
||||
@@ -205,9 +207,9 @@ class TestAutoTriggerActionPlan(base.DbTestCase):
|
||||
mock_list, mock_applier):
|
||||
mock_get_by_id.return_value = self.audit
|
||||
mock_list.return_value = []
|
||||
auto_trigger_handler = oneshot.OneShotAuditHandler(mock.MagicMock())
|
||||
with mock.patch.object(auto_trigger_handler, 'do_schedule',
|
||||
new_callable=mock.PropertyMock) as m_schedule:
|
||||
auto_trigger_handler = oneshot.OneShotAuditHandler()
|
||||
with mock.patch.object(auto_trigger_handler,
|
||||
'do_schedule') as m_schedule:
|
||||
m_schedule().uuid = self.recommended_action_plan.uuid
|
||||
auto_trigger_handler.post_execute(self.audit, mock.MagicMock(),
|
||||
self.context)
|
||||
@@ -234,30 +236,39 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
goal=self.goal)
|
||||
for id_ in range(2, 4)]
|
||||
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
def test_launch_audits_periodically(self, mock_list, mock_jobs,
|
||||
m_add_job, m_collector):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
m_add_job, m_engine, m_service):
|
||||
audit_handler = continuous.ContinuousAuditHandler()
|
||||
mock_list.return_value = self.audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
m_engine.return_value = mock.MagicMock()
|
||||
m_add_job.return_value = audit_handler.execute_audit(
|
||||
self.audits[0], self.context)
|
||||
m_collector.return_value = faker.FakerModelCollector()
|
||||
|
||||
audit_handler.launch_audits_periodically()
|
||||
m_service.assert_called()
|
||||
m_engine.assert_called()
|
||||
m_add_job.assert_called()
|
||||
mock_jobs.assert_called()
|
||||
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
def test_launch_multiply_audits_periodically(self, mock_list,
|
||||
mock_jobs, m_add_job):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
mock_jobs, m_add_job,
|
||||
m_engine, m_service):
|
||||
audit_handler = continuous.ContinuousAuditHandler()
|
||||
mock_list.return_value = self.audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
m_engine.return_value = mock.MagicMock()
|
||||
m_service.return_value = mock.MagicMock()
|
||||
calls = [mock.call(audit_handler.execute_audit, 'interval',
|
||||
args=[mock.ANY, mock.ANY],
|
||||
seconds=3600,
|
||||
@@ -266,26 +277,39 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
audit_handler.launch_audits_periodically()
|
||||
m_add_job.assert_has_calls(calls)
|
||||
|
||||
@mock.patch.object(background.BackgroundScheduler, 'add_job')
|
||||
@mock.patch.object(background.BackgroundScheduler, 'get_jobs')
|
||||
@mock.patch.object(objects.service.Service, 'list')
|
||||
@mock.patch.object(sq_api, 'get_engine')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'add_job')
|
||||
@mock.patch.object(scheduling.BackgroundSchedulerService, 'get_jobs')
|
||||
@mock.patch.object(objects.audit.Audit, 'list')
|
||||
def test_period_audit_not_called_when_deleted(self, mock_list,
|
||||
mock_jobs, m_add_job):
|
||||
audit_handler = continuous.ContinuousAuditHandler(mock.MagicMock())
|
||||
mock_jobs, m_add_job,
|
||||
m_engine, m_service):
|
||||
audit_handler = continuous.ContinuousAuditHandler()
|
||||
mock_list.return_value = self.audits
|
||||
mock_jobs.return_value = mock.MagicMock()
|
||||
m_service.return_value = mock.MagicMock()
|
||||
m_engine.return_value = mock.MagicMock()
|
||||
self.audits[1].state = objects.audit.State.CANCELLED
|
||||
self.audits[0].state = objects.audit.State.SUSPENDED
|
||||
|
||||
for state in [objects.audit.State.CANCELLED,
|
||||
objects.audit.State.SUSPENDED]:
|
||||
self.audits[1].state = state
|
||||
calls = [mock.call(audit_handler.execute_audit, 'interval',
|
||||
args=[mock.ANY, mock.ANY],
|
||||
seconds=3600,
|
||||
name='execute_audit',
|
||||
next_run_time=mock.ANY)]
|
||||
audit_handler.launch_audits_periodically()
|
||||
m_add_job.assert_has_calls(calls)
|
||||
ap_jobs = [job.Job(mock.MagicMock(), name='execute_audit',
|
||||
func=audit_handler.execute_audit,
|
||||
args=(self.audits[0], mock.MagicMock()),
|
||||
kwargs={}),
|
||||
job.Job(mock.MagicMock(), name='execute_audit',
|
||||
func=audit_handler.execute_audit,
|
||||
args=(self.audits[1], mock.MagicMock()),
|
||||
kwargs={})
|
||||
]
|
||||
mock_jobs.return_value = ap_jobs
|
||||
audit_handler.launch_audits_periodically()
|
||||
|
||||
audit_handler.update_audit_state(self.audits[1], state)
|
||||
is_inactive = audit_handler._is_audit_inactive(self.audits[1])
|
||||
audit_handler.update_audit_state(self.audits[1],
|
||||
objects.audit.State.CANCELLED)
|
||||
audit_handler.update_audit_state(self.audits[0],
|
||||
objects.audit.State.SUSPENDED)
|
||||
is_inactive = audit_handler._is_audit_inactive(self.audits[1])
|
||||
self.assertTrue(is_inactive)
|
||||
is_inactive = audit_handler._is_audit_inactive(self.audits[0])
|
||||
self.assertTrue(is_inactive)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from watcher.decision_engine.audit import continuous as continuous_handler
|
||||
from watcher.decision_engine.audit import oneshot as oneshot_handler
|
||||
from watcher.decision_engine.messaging import audit_endpoint
|
||||
from watcher.decision_engine.model.collector import manager
|
||||
@@ -34,11 +35,12 @@ class TestAuditEndpoint(base.DbTestCase):
|
||||
self.context,
|
||||
audit_template_id=self.audit_template.id)
|
||||
|
||||
@mock.patch.object(continuous_handler.ContinuousAuditHandler, 'start')
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_do_trigger_audit(self, mock_collector):
|
||||
def test_do_trigger_audit(self, mock_collector, mock_handler):
|
||||
mock_collector.return_value = faker_cluster_state.FakerModelCollector()
|
||||
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler
|
||||
endpoint = audit_endpoint.AuditEndpoint(audit_handler)
|
||||
|
||||
with mock.patch.object(oneshot_handler.OneShotAuditHandler,
|
||||
@@ -48,11 +50,12 @@ class TestAuditEndpoint(base.DbTestCase):
|
||||
|
||||
self.assertEqual(mock_call.call_count, 1)
|
||||
|
||||
@mock.patch.object(continuous_handler.ContinuousAuditHandler, 'start')
|
||||
@mock.patch.object(manager.CollectorManager, "get_cluster_model_collector")
|
||||
def test_trigger_audit(self, mock_collector):
|
||||
def test_trigger_audit(self, mock_collector, mock_handler):
|
||||
mock_collector.return_value = faker_cluster_state.FakerModelCollector()
|
||||
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler(mock.MagicMock())
|
||||
audit_handler = oneshot_handler.OneShotAuditHandler
|
||||
endpoint = audit_endpoint.AuditEndpoint(audit_handler)
|
||||
|
||||
with mock.patch.object(endpoint.executor, 'submit') as mock_call:
|
||||
|
||||
@@ -72,8 +72,9 @@ class TestReceiveNotifications(NotificationTestCase):
|
||||
m_from_dict.return_value = self.context
|
||||
self.addCleanup(p_from_dict.stop)
|
||||
|
||||
@mock.patch.object(watcher_service.ServiceHeartbeat, 'send_beat')
|
||||
@mock.patch.object(DummyNotification, 'info')
|
||||
def test_receive_dummy_notification(self, m_info):
|
||||
def test_receive_dummy_notification(self, m_info, m_heartbeat):
|
||||
message = {
|
||||
'publisher_id': 'nova-compute',
|
||||
'event_type': 'compute.dummy',
|
||||
@@ -90,8 +91,9 @@ class TestReceiveNotifications(NotificationTestCase):
|
||||
{'data': {'nested': 'TEST'}},
|
||||
{'message_id': None, 'timestamp': None})
|
||||
|
||||
@mock.patch.object(watcher_service.ServiceHeartbeat, 'send_beat')
|
||||
@mock.patch.object(DummyNotification, 'info')
|
||||
def test_skip_unwanted_notification(self, m_info):
|
||||
def test_skip_unwanted_notification(self, m_info, m_heartbeat):
|
||||
message = {
|
||||
'publisher_id': 'nova-compute',
|
||||
'event_type': 'compute.dummy',
|
||||
|
||||
@@ -56,6 +56,10 @@ class TestReceiveNovaNotifications(NotificationTestCase):
|
||||
m_from_dict = p_from_dict.start()
|
||||
m_from_dict.return_value = self.context
|
||||
self.addCleanup(p_from_dict.stop)
|
||||
p_heartbeat = mock.patch.object(
|
||||
watcher_service.ServiceHeartbeat, "send_beat")
|
||||
self.m_heartbeat = p_heartbeat.start()
|
||||
self.addCleanup(p_heartbeat.stop)
|
||||
|
||||
@mock.patch.object(novanotification.ServiceUpdated, 'info')
|
||||
def test_nova_receive_service_update(self, m_info):
|
||||
|
||||
Reference in New Issue
Block a user