diff --git a/watcher/applier/action_plan/default.py b/watcher/applier/action_plan/default.py index 529022480..fa05e1bf4 100644 --- a/watcher/applier/action_plan/default.py +++ b/watcher/applier/action_plan/default.py @@ -35,16 +35,16 @@ class DefaultActionPlanHandler(BaseActionPlanHandler): self.action_plan_uuid = action_plan_uuid self.manager_applier = manager_applier - def notify(self, uuid, event_type, status): + def notify(self, uuid, event_type, state): action_plan = ActionPlan.get_by_uuid(self.ctx, uuid) - action_plan.state = status + action_plan.state = state action_plan.save() event = Event() - event.set_type(event_type) - event.set_data({}) + event.type = event_type + event.data = {} payload = {'action_plan__uuid': uuid, - 'action_plan_status': status} - self.manager_applier.topic_status.publish_event(event.get_type().name, + 'action_plan_state': state} + self.manager_applier.topic_status.publish_event(event.type.name, payload) def execute(self): diff --git a/watcher/applier/execution/executor.py b/watcher/applier/execution/executor.py index 4de26d58e..16b977357 100644 --- a/watcher/applier/execution/executor.py +++ b/watcher/applier/execution/executor.py @@ -42,11 +42,11 @@ class ActionPlanExecutor(object): db_action.state = state db_action.save() event = Event() - event.set_type(Events.LAUNCH_ACTION) - event.set_data({}) + event.type = Events.LAUNCH_ACTION + event.data = {} payload = {'action_uuid': action.uuid, - 'action_status': state} - self.manager_applier.topic_status.publish_event(event.get_type().name, + 'action_state': state} + self.manager_applier.topic_status.publish_event(event.type.name, payload) def execute(self, actions): diff --git a/watcher/applier/manager.py b/watcher/applier/manager.py index 00ef62475..d4150ca03 100644 --- a/watcher/applier/manager.py +++ b/watcher/applier/manager.py @@ -23,9 +23,6 @@ from oslo_log import log from watcher.applier.messaging.trigger import TriggerActionPlan from watcher.common.messaging.messaging_core import MessagingCore -from watcher.common.messaging.notification_handler import NotificationHandler -from watcher.decision_engine.messaging.events import Events - LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -59,8 +56,6 @@ CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group) class ApplierManager(MessagingCore): - # todo(jed) need workflow - def __init__(self): super(ApplierManager, self).__init__( CONF.watcher_applier.publisher_id, @@ -70,24 +65,9 @@ class ApplierManager(MessagingCore): ) # shared executor of the workflow self.executor = ThreadPoolExecutor(max_workers=1) - self.handler = NotificationHandler(self.publisher_id) - self.handler.register_observer(self) - self.add_event_listener(Events.ALL, self.event_receive) # trigger action_plan self.topic_control.add_endpoint(TriggerActionPlan(self)) def join(self): self.topic_control.join() self.topic_status.join() - - def event_receive(self, event): - try: - request_id = event.get_request_id() - event_type = event.get_type() - data = event.get_data() - LOG.debug("request id => %s" % request_id) - LOG.debug("type_event => %s" % str(event_type)) - LOG.debug("data => %s" % str(data)) - except Exception as e: - LOG.exception(e) - raise diff --git a/watcher/common/messaging/events/event.py b/watcher/common/messaging/events/event.py index 096ed52a0..8c58112b8 100644 --- a/watcher/common/messaging/events/event.py +++ b/watcher/common/messaging/events/event.py @@ -29,20 +29,26 @@ class Event(object): self._data = data self._request_id = request_id - def get_type(self): + @property + def type(self): return self._type - def set_type(self, type): + @type.setter + def type(self, type): self._type = type - def get_data(self): + @property + def data(self): return self._data - def set_data(self, data): + @data.setter + def data(self, data): self._data = data - def set_request_id(self, id): - self._request_id = id - - def get_request_id(self): + @property + def request_id(self): return self._request_id + + @request_id.setter + def request_id(self, id): + self._request_id = id diff --git a/watcher/common/messaging/events/event_dispatcher.py b/watcher/common/messaging/events/event_dispatcher.py index cbc2de78a..6805065e2 100644 --- a/watcher/common/messaging/events/event_dispatcher.py +++ b/watcher/common/messaging/events/event_dispatcher.py @@ -39,7 +39,7 @@ class EventDispatcher(object): return False def dispatch_event(self, event): - LOG.debug("dispatch evt : %s" % str(event.get_type())) + LOG.debug("dispatch evt : %s" % str(event.type)) """ Dispatch an instance of Event class """ @@ -49,8 +49,8 @@ class EventDispatcher(object): listener(event) # Dispatch the event to all the associated listeners - if event.get_type() in self._events.keys(): - listeners = self._events[event.get_type()] + if event.type in self._events.keys(): + listeners = self._events[event.type] for listener in listeners: listener(event) diff --git a/watcher/decision_engine/audit/default.py b/watcher/decision_engine/audit/default.py index 91d36dcbf..027fad0c5 100644 --- a/watcher/decision_engine/audit/default.py +++ b/watcher/decision_engine/audit/default.py @@ -50,11 +50,11 @@ class DefaultAuditHandler(BaseAuditHandler): def notify(self, audit_uuid, event_type, status): event = Event() - event.set_type(event_type) - event.set_data({}) + event.type = event_type + event.data = {} payload = {'audit_uuid': audit_uuid, 'audit_status': status} - self.messaging.topic_status.publish_event(event.get_type().name, + self.messaging.topic_status.publish_event(event.type.name, payload) def update_audit_state(self, request_context, audit_uuid, state): diff --git a/watcher/decision_engine/event/__init__.py b/watcher/decision_engine/event/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/decision_engine/event/consumer_factory.py b/watcher/decision_engine/event/consumer_factory.py deleted file mode 100644 index ea53cc8a8..000000000 --- a/watcher/decision_engine/event/consumer_factory.py +++ /dev/null @@ -1,27 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class EventConsumerFactory(object): - - def factory(self, type): - """Factory so as to create - - :param type: - :return: - """ - # return eval(type + "()") - raise AssertionError() diff --git a/watcher/decision_engine/manager.py b/watcher/decision_engine/manager.py index ac352b4b5..e088094c1 100644 --- a/watcher/decision_engine/manager.py +++ b/watcher/decision_engine/manager.py @@ -21,10 +21,8 @@ from oslo_config import cfg from oslo_log import log from watcher.common.messaging.messaging_core import MessagingCore -from watcher.common.messaging.notification_handler import NotificationHandler -from watcher.decision_engine.event.consumer_factory import EventConsumerFactory from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint -from watcher.decision_engine.messaging.events import Events + LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -67,9 +65,6 @@ class DecisionEngineManager(MessagingCore): CONF.watcher_decision_engine.topic_control, CONF.watcher_decision_engine.topic_status, api_version=self.API_VERSION) - self.handler = NotificationHandler(self.publisher_id) - self.handler.register_observer(self) - self.add_event_listener(Events.ALL, self.event_receive) endpoint = AuditEndpoint(self, max_workers=CONF.watcher_decision_engine. max_workers) @@ -78,20 +73,3 @@ class DecisionEngineManager(MessagingCore): def join(self): self.topic_control.join() self.topic_status.join() - - # TODO(ebe): Producer / consumer - def event_receive(self, event): - try: - request_id = event.get_request_id() - event_type = event.get_type() - data = event.get_data() - LOG.debug("request id => %s" % event.get_request_id()) - LOG.debug("type_event => %s" % str(event.get_type())) - LOG.debug("data => %s" % str(data)) - - event_consumer = EventConsumerFactory().factory(event_type) - event_consumer.messaging = self - event_consumer.execute(request_id, data) - except Exception as e: - LOG.error("evt %s" % e.message) - raise e diff --git a/watcher/decision_engine/messaging/event_consumer.py b/watcher/decision_engine/messaging/event_consumer.py deleted file mode 100644 index 19fc44942..000000000 --- a/watcher/decision_engine/messaging/event_consumer.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import abc -import six - - -@six.add_metaclass(abc.ABCMeta) -class BaseEventConsumer(object): - - def __init__(self): - self._messaging = None - - @property - def messaging(self): - return self._messaging - - @messaging.setter - def messaging(self, e): - self._messaging = e - - @abc.abstractmethod - def execute(self, request_id, context, data): - raise NotImplementedError() diff --git a/watcher/decision_engine/rpcapi.py b/watcher/decision_engine/rpcapi.py index d40731992..beb05cdad 100644 --- a/watcher/decision_engine/rpcapi.py +++ b/watcher/decision_engine/rpcapi.py @@ -23,12 +23,10 @@ import oslo_messaging as om from watcher.common import exception from watcher.common.messaging.messaging_core import MessagingCore -from watcher.common.messaging.notification_handler import NotificationHandler from watcher.common import utils -from watcher.decision_engine.event.consumer_factory import EventConsumerFactory from watcher.decision_engine.manager import decision_engine_opt_group from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS -from watcher.decision_engine.messaging.events import Events + LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -46,17 +44,12 @@ class DecisionEngineAPI(MessagingCore): CONF.watcher_decision_engine.topic_status, api_version=self.API_VERSION, ) - self.handler = NotificationHandler(self.publisher_id) - self.handler.register_observer(self) - self.add_event_listener(Events.ALL, self.event_receive) - self.topic_status.add_endpoint(self.handler) transport = om.get_transport(CONF) target = om.Target( topic=CONF.watcher_decision_engine.topic_control, version=self.API_VERSION, ) - self.client = om.RPCClient(transport, target, serializer=self.serializer) @@ -66,20 +59,3 @@ class DecisionEngineAPI(MessagingCore): return self.client.call( context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid) - - # TODO(ebe): Producteur / consommateur implementer - def event_receive(self, event): - - try: - request_id = event.get_request_id() - event_type = event.get_type() - data = event.get_data() - LOG.debug("request id => %s" % event.get_request_id()) - LOG.debug("type_event => %s" % str(event.get_type())) - LOG.debug("data => %s" % str(data)) - - event_consumer = EventConsumerFactory.factory(event_type) - event_consumer.execute(request_id, self.context, data) - except Exception as e: - LOG.exception(e) - raise diff --git a/watcher/tests/applier/test_applier_manager.py b/watcher/tests/applier/test_applier_manager.py index a707df1dd..9502c9da0 100644 --- a/watcher/tests/applier/test_applier_manager.py +++ b/watcher/tests/applier/test_applier_manager.py @@ -16,8 +16,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +from mock import patch +from threading import Thread + from watcher.applier.manager import ApplierManager -from watcher.common.messaging.events.event import Event +from watcher.common.messaging.messaging_core import MessagingCore from watcher.tests import base @@ -26,6 +30,10 @@ class TestApplierManager(base.TestCase): super(TestApplierManager, self).setUp() self.applier = ApplierManager() - def test_evt(self): - e = Event() - self.applier.event_receive(e) + @patch.object(MessagingCore, "connect") + @patch.object(Thread, "join") + def test_connect(self, m_messaging, m_thread): + self.applier.connect() + self.applier.join() + self.assertEqual(m_messaging.call_count, 2) + self.assertEqual(m_thread.call_count, 1) diff --git a/watcher/tests/common/messaging/event/test_event_dispatcher.py b/watcher/tests/common/messaging/event/test_event_dispatcher.py index 92bf4b899..30ec8fd51 100644 --- a/watcher/tests/common/messaging/event/test_event_dispatcher.py +++ b/watcher/tests/common/messaging/event/test_event_dispatcher.py @@ -32,7 +32,7 @@ class TestEventDispatcher(base.TestCase): def fake_event(self, event_type): event = Event() - event.set_type(event_type) + event.type = event_type return event def test_add_listener(self): diff --git a/watcher/tests/decision_engine/audit/test_event_consumer_factory.py b/watcher/tests/decision_engine/audit/test_event_consumer_factory.py deleted file mode 100644 index 5729267f4..000000000 --- a/watcher/tests/decision_engine/audit/test_event_consumer_factory.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from watcher.decision_engine.event.consumer_factory import EventConsumerFactory -from watcher.decision_engine.messaging.events import Events -from watcher.tests import base - - -class TestEventConsumerFactory(base.TestCase): - - event_consumer_factory = EventConsumerFactory() - - def test_factory_with_unknown_type(self): - self.assertRaises(AssertionError, - self.event_consumer_factory.factory, - Events.ALL) diff --git a/watcher/tests/decision_engine/test_manager.py b/watcher/tests/decision_engine/test_manager.py deleted file mode 100644 index 0288df6c0..000000000 --- a/watcher/tests/decision_engine/test_manager.py +++ /dev/null @@ -1,43 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import mock - -from watcher.common import utils - -from watcher.decision_engine.event.consumer_factory import EventConsumerFactory - -from watcher.common.messaging.events.event import Event -from watcher.decision_engine.manager import DecisionEngineManager - -from watcher.decision_engine.messaging.events import Events -from watcher.tests import base - - -class TestDecisionEngineManager(base.TestCase): - def setUp(self): - super(TestDecisionEngineManager, self).setUp() - self.manager = DecisionEngineManager() - - def test_event_receive(self): - # todo(jed) remove useless - with mock.patch.object(EventConsumerFactory, 'factory') as mock_call: - data = {"key1": "value"} - request_id = utils.generate_uuid() - event_type = Events.TRIGGER_AUDIT - event = Event(event_type, data, request_id) - self.manager.event_receive(event) - mock_call.assert_called_once_with(event_type)