Merge "Define self.client in MessagingCore"
This commit is contained in:
@@ -43,8 +43,8 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
|||||||
ev.data = {}
|
ev.data = {}
|
||||||
payload = {'action_plan__uuid': uuid,
|
payload = {'action_plan__uuid': uuid,
|
||||||
'action_plan_state': state}
|
'action_plan_state': state}
|
||||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
self.applier_manager.status_topic_handler.publish_event(
|
||||||
payload)
|
ev.type.name, payload)
|
||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -34,12 +34,12 @@ APPLIER_MANAGER_OPTS = [
|
|||||||
min=1,
|
min=1,
|
||||||
required=True,
|
required=True,
|
||||||
help='Number of workers for applier, default value is 1.'),
|
help='Number of workers for applier, default value is 1.'),
|
||||||
cfg.StrOpt('topic_control',
|
cfg.StrOpt('conductor_topic',
|
||||||
default='watcher.applier.control',
|
default='watcher.applier.control',
|
||||||
help='The topic name used for'
|
help='The topic name used for'
|
||||||
'control events, this topic '
|
'control events, this topic '
|
||||||
'used for rpc call '),
|
'used for rpc call '),
|
||||||
cfg.StrOpt('topic_status',
|
cfg.StrOpt('status_topic',
|
||||||
default='watcher.applier.status',
|
default='watcher.applier.status',
|
||||||
help='The topic name used for '
|
help='The topic name used for '
|
||||||
'status events, this topic '
|
'status events, this topic '
|
||||||
@@ -67,12 +67,13 @@ class ApplierManager(messaging_core.MessagingCore):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(ApplierManager, self).__init__(
|
super(ApplierManager, self).__init__(
|
||||||
CONF.watcher_applier.publisher_id,
|
CONF.watcher_applier.publisher_id,
|
||||||
CONF.watcher_applier.topic_control,
|
CONF.watcher_applier.conductor_topic,
|
||||||
CONF.watcher_applier.topic_status,
|
CONF.watcher_applier.status_topic,
|
||||||
api_version=self.API_VERSION,
|
api_version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
self.topic_control.add_endpoint(trigger.TriggerActionPlan(self))
|
self.conductor_topic_handler.add_endpoint(
|
||||||
|
trigger.TriggerActionPlan(self))
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
self.topic_control.join()
|
self.conductor_topic_handler.join()
|
||||||
self.topic_status.join()
|
self.status_topic_handler.join()
|
||||||
|
|||||||
@@ -39,17 +39,17 @@ class ApplierAPI(messaging_core.MessagingCore):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(ApplierAPI, self).__init__(
|
super(ApplierAPI, self).__init__(
|
||||||
CONF.watcher_applier.publisher_id,
|
CONF.watcher_applier.publisher_id,
|
||||||
CONF.watcher_applier.topic_control,
|
CONF.watcher_applier.conductor_topic,
|
||||||
CONF.watcher_applier.topic_status,
|
CONF.watcher_applier.status_topic,
|
||||||
api_version=self.API_VERSION,
|
api_version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
self.handler = notification.NotificationHandler(self.publisher_id)
|
self.handler = notification.NotificationHandler(self.publisher_id)
|
||||||
self.handler.register_observer(self)
|
self.handler.register_observer(self)
|
||||||
self.topic_status.add_endpoint(self.handler)
|
self.status_topic_handler.add_endpoint(self.handler)
|
||||||
transport = om.get_transport(CONF)
|
transport = om.get_transport(CONF)
|
||||||
|
|
||||||
target = om.Target(
|
target = om.Target(
|
||||||
topic=CONF.watcher_applier.topic_control,
|
topic=CONF.watcher_applier.conductor_topic,
|
||||||
version=self.API_VERSION,
|
version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -62,8 +62,8 @@ class BaseWorkFlowEngine(object):
|
|||||||
ev.data = {}
|
ev.data = {}
|
||||||
payload = {'action_uuid': action.uuid,
|
payload = {'action_uuid': action.uuid,
|
||||||
'action_state': state}
|
'action_state': state}
|
||||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
self.applier_manager.status_topic_handler.publish_event(
|
||||||
payload)
|
ev.type.name, payload)
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def execute(self, actions):
|
def execute(self, actions):
|
||||||
|
|||||||
@@ -16,58 +16,100 @@
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from watcher.common.messaging.events.event_dispatcher import \
|
import oslo_messaging as om
|
||||||
EventDispatcher
|
|
||||||
from watcher.common.messaging.messaging_handler import \
|
|
||||||
MessagingHandler
|
|
||||||
from watcher.common.rpc import RequestContextSerializer
|
|
||||||
|
|
||||||
from watcher.objects.base import WatcherObjectSerializer
|
from watcher.common.messaging.events import event_dispatcher as dispatcher
|
||||||
|
from watcher.common.messaging import messaging_handler
|
||||||
|
from watcher.common import rpc
|
||||||
|
|
||||||
|
from watcher.objects import base
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
class MessagingCore(EventDispatcher):
|
class MessagingCore(dispatcher.EventDispatcher):
|
||||||
|
|
||||||
API_VERSION = '1.0'
|
API_VERSION = '1.0'
|
||||||
|
|
||||||
def __init__(self, publisher_id, topic_control, topic_status,
|
def __init__(self, publisher_id, conductor_topic, status_topic,
|
||||||
api_version=API_VERSION):
|
api_version=API_VERSION):
|
||||||
super(MessagingCore, self).__init__()
|
super(MessagingCore, self).__init__()
|
||||||
self.serializer = RequestContextSerializer(WatcherObjectSerializer())
|
self.serializer = rpc.RequestContextSerializer(
|
||||||
|
base.WatcherObjectSerializer())
|
||||||
self.publisher_id = publisher_id
|
self.publisher_id = publisher_id
|
||||||
self.api_version = api_version
|
self.api_version = api_version
|
||||||
self.topic_control = self.build_topic(topic_control)
|
|
||||||
self.topic_status = self.build_topic(topic_status)
|
|
||||||
|
|
||||||
def build_topic(self, topic_name):
|
self.conductor_topic = conductor_topic
|
||||||
return MessagingHandler(self.publisher_id, topic_name, self,
|
self.status_topic = status_topic
|
||||||
self.api_version, self.serializer)
|
self.conductor_topic_handler = self.build_topic_handler(
|
||||||
|
conductor_topic)
|
||||||
|
self.status_topic_handler = self.build_topic_handler(status_topic)
|
||||||
|
|
||||||
|
self._conductor_client = None
|
||||||
|
self._status_client = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def conductor_client(self):
|
||||||
|
if self._conductor_client is None:
|
||||||
|
transport = om.get_transport(CONF)
|
||||||
|
target = om.Target(
|
||||||
|
topic=self.conductor_topic,
|
||||||
|
version=self.API_VERSION,
|
||||||
|
)
|
||||||
|
self._conductor_client = om.RPCClient(
|
||||||
|
transport, target, serializer=self.serializer)
|
||||||
|
return self._conductor_client
|
||||||
|
|
||||||
|
@conductor_client.setter
|
||||||
|
def conductor_client(self, c):
|
||||||
|
self.conductor_client = c
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status_client(self):
|
||||||
|
if self._status_client is None:
|
||||||
|
transport = om.get_transport(CONF)
|
||||||
|
target = om.Target(
|
||||||
|
topic=self.status_topic,
|
||||||
|
version=self.API_VERSION,
|
||||||
|
)
|
||||||
|
self._status_client = om.RPCClient(
|
||||||
|
transport, target, serializer=self.serializer)
|
||||||
|
return self._status_client
|
||||||
|
|
||||||
|
@status_client.setter
|
||||||
|
def status_client(self, c):
|
||||||
|
self.status_client = c
|
||||||
|
|
||||||
|
def build_topic_handler(self, topic_name):
|
||||||
|
return messaging_handler.MessagingHandler(
|
||||||
|
self.publisher_id, topic_name, self,
|
||||||
|
self.api_version, self.serializer)
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
LOG.debug("Connecting to '%s' (%s)",
|
LOG.debug("Connecting to '%s' (%s)",
|
||||||
CONF.transport_url, CONF.rpc_backend)
|
CONF.transport_url, CONF.rpc_backend)
|
||||||
self.topic_control.start()
|
self.conductor_topic_handler.start()
|
||||||
self.topic_status.start()
|
self.status_topic_handler.start()
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
LOG.debug("Disconnecting from '%s' (%s)",
|
LOG.debug("Disconnecting from '%s' (%s)",
|
||||||
CONF.transport_url, CONF.rpc_backend)
|
CONF.transport_url, CONF.rpc_backend)
|
||||||
self.topic_control.stop()
|
self.conductor_topic_handler.stop()
|
||||||
self.topic_status.stop()
|
self.status_topic_handler.stop()
|
||||||
|
|
||||||
def publish_control(self, event, payload):
|
def publish_control(self, event, payload):
|
||||||
return self.topic_control.publish_event(event, payload)
|
return self.conductor_topic_handler.publish_event(event, payload)
|
||||||
|
|
||||||
def publish_status(self, event, payload, request_id=None):
|
def publish_status(self, event, payload, request_id=None):
|
||||||
return self.topic_status.publish_event(event, payload, request_id)
|
return self.status_topic_handler.publish_event(
|
||||||
|
event, payload, request_id)
|
||||||
|
|
||||||
def get_version(self):
|
def get_version(self):
|
||||||
return self.api_version
|
return self.api_version
|
||||||
|
|
||||||
def check_api_version(self, context):
|
def check_api_version(self, context):
|
||||||
api_manager_version = self.client.call(
|
api_manager_version = self.conductor_client.call(
|
||||||
context.to_dict(), 'check_api_version',
|
context.to_dict(), 'check_api_version',
|
||||||
api_version=self.api_version)
|
api_version=self.api_version)
|
||||||
return api_manager_version
|
return api_manager_version
|
||||||
|
|||||||
@@ -38,11 +38,11 @@ CONF = cfg.CONF
|
|||||||
|
|
||||||
class MessagingHandler(threading.Thread):
|
class MessagingHandler(threading.Thread):
|
||||||
|
|
||||||
def __init__(self, publisher_id, topic_watcher, endpoint, version,
|
def __init__(self, publisher_id, topic_name, endpoint, version,
|
||||||
serializer=None):
|
serializer=None):
|
||||||
super(MessagingHandler, self).__init__()
|
super(MessagingHandler, self).__init__()
|
||||||
self.publisher_id = publisher_id
|
self.publisher_id = publisher_id
|
||||||
self.topic_watcher = topic_watcher
|
self.topic_name = topic_name
|
||||||
self.__endpoints = []
|
self.__endpoints = []
|
||||||
self.__serializer = serializer
|
self.__serializer = serializer
|
||||||
self.__version = version
|
self.__version = version
|
||||||
@@ -72,7 +72,7 @@ class MessagingHandler(threading.Thread):
|
|||||||
return om.Notifier(
|
return om.Notifier(
|
||||||
self.__transport,
|
self.__transport,
|
||||||
publisher_id=self.publisher_id,
|
publisher_id=self.publisher_id,
|
||||||
topic=self.topic_watcher,
|
topic=self.topic_name,
|
||||||
serializer=serializer
|
serializer=serializer
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -87,7 +87,7 @@ class MessagingHandler(threading.Thread):
|
|||||||
self.__notifier = self.build_notifier()
|
self.__notifier = self.build_notifier()
|
||||||
if len(self.__endpoints):
|
if len(self.__endpoints):
|
||||||
target = om.Target(
|
target = om.Target(
|
||||||
topic=self.topic_watcher,
|
topic=self.topic_name,
|
||||||
# For compatibility, we can override it with 'host' opt
|
# For compatibility, we can override it with 'host' opt
|
||||||
server=CONF.host or socket.getfqdn(),
|
server=CONF.host or socket.getfqdn(),
|
||||||
version=self.__version,
|
version=self.__version,
|
||||||
@@ -101,7 +101,7 @@ class MessagingHandler(threading.Thread):
|
|||||||
LOG.error(_LE("Messaging configuration error"))
|
LOG.error(_LE("Messaging configuration error"))
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
LOG.debug("configure MessagingHandler for %s" % self.topic_watcher)
|
LOG.debug("configure MessagingHandler for %s" % self.topic_name)
|
||||||
self._configure()
|
self._configure()
|
||||||
if len(self.__endpoints) > 0:
|
if len(self.__endpoints) > 0:
|
||||||
LOG.debug("Starting up server")
|
LOG.debug("Starting up server")
|
||||||
|
|||||||
@@ -18,17 +18,16 @@ import eventlet
|
|||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
|
|
||||||
from watcher.common.messaging.utils.observable import \
|
from watcher.common.messaging.utils import observable
|
||||||
Observable
|
|
||||||
|
|
||||||
|
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class NotificationHandler(Observable):
|
class NotificationHandler(observable.Observable):
|
||||||
def __init__(self, publisher_id):
|
def __init__(self, publisher_id):
|
||||||
Observable.__init__(self)
|
super(NotificationHandler, self).__init__()
|
||||||
self.publisher_id = publisher_id
|
self.publisher_id = publisher_id
|
||||||
|
|
||||||
def info(self, ctx, publisher_id, event_type, payload, metadata):
|
def info(self, ctx, publisher_id, event_type, payload, metadata):
|
||||||
|
|||||||
@@ -14,19 +14,14 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from oslo_log import log
|
from watcher.common.messaging.utils import synchronization
|
||||||
|
|
||||||
from watcher.common.messaging.utils.synchronization import \
|
|
||||||
Synchronization
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Observable(Synchronization):
|
class Observable(synchronization.Synchronization):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
super(Observable, self).__init__()
|
||||||
self.__observers = []
|
self.__observers = []
|
||||||
self.changed = 0
|
self.changed = 0
|
||||||
Synchronization.__init__(self)
|
|
||||||
|
|
||||||
def set_changed(self):
|
def set_changed(self):
|
||||||
self.changed = 1
|
self.changed = 1
|
||||||
|
|||||||
@@ -54,8 +54,8 @@ class DefaultAuditHandler(base.BaseAuditHandler):
|
|||||||
event.data = {}
|
event.data = {}
|
||||||
payload = {'audit_uuid': audit_uuid,
|
payload = {'audit_uuid': audit_uuid,
|
||||||
'audit_status': status}
|
'audit_status': status}
|
||||||
self.messaging.topic_status.publish_event(event.type.name,
|
self.messaging.status_topic_handler.publish_event(
|
||||||
payload)
|
event.type.name, payload)
|
||||||
|
|
||||||
def update_audit_state(self, request_context, audit_uuid, state):
|
def update_audit_state(self, request_context, audit_uuid, state):
|
||||||
LOG.debug("Update audit state: %s", state)
|
LOG.debug("Update audit state: %s", state)
|
||||||
|
|||||||
@@ -40,20 +40,20 @@ See :doc:`../architecture` for more details on this component.
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
from watcher.common.messaging.messaging_core import MessagingCore
|
from watcher.common.messaging import messaging_core
|
||||||
from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint
|
from watcher.decision_engine.messaging import audit_endpoint
|
||||||
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
WATCHER_DECISION_ENGINE_OPTS = [
|
WATCHER_DECISION_ENGINE_OPTS = [
|
||||||
cfg.StrOpt('topic_control',
|
cfg.StrOpt('conductor_topic',
|
||||||
default='watcher.decision.control',
|
default='watcher.decision.control',
|
||||||
help='The topic name used for'
|
help='The topic name used for'
|
||||||
'control events, this topic '
|
'control events, this topic '
|
||||||
'used for rpc call '),
|
'used for rpc call '),
|
||||||
cfg.StrOpt('topic_status',
|
cfg.StrOpt('status_topic',
|
||||||
default='watcher.decision.status',
|
default='watcher.decision.status',
|
||||||
help='The topic name used for '
|
help='The topic name used for '
|
||||||
'status events, this topic '
|
'status events, this topic '
|
||||||
@@ -78,18 +78,18 @@ CONF.register_group(decision_engine_opt_group)
|
|||||||
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
|
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
|
||||||
|
|
||||||
|
|
||||||
class DecisionEngineManager(MessagingCore):
|
class DecisionEngineManager(messaging_core.MessagingCore):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(DecisionEngineManager, self).__init__(
|
super(DecisionEngineManager, self).__init__(
|
||||||
CONF.watcher_decision_engine.publisher_id,
|
CONF.watcher_decision_engine.publisher_id,
|
||||||
CONF.watcher_decision_engine.topic_control,
|
CONF.watcher_decision_engine.conductor_topic,
|
||||||
CONF.watcher_decision_engine.topic_status,
|
CONF.watcher_decision_engine.status_topic,
|
||||||
api_version=self.API_VERSION)
|
api_version=self.API_VERSION)
|
||||||
endpoint = AuditEndpoint(self,
|
endpoint = audit_endpoint.AuditEndpoint(
|
||||||
max_workers=CONF.watcher_decision_engine.
|
self,
|
||||||
max_workers)
|
max_workers=CONF.watcher_decision_engine.max_workers)
|
||||||
self.topic_control.add_endpoint(endpoint)
|
self.conductor_topic_handler.add_endpoint(endpoint)
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
self.topic_control.join()
|
self.conductor_topic_handler.join()
|
||||||
self.topic_status.join()
|
self.status_topic_handler.join()
|
||||||
|
|||||||
@@ -19,10 +19,10 @@
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import oslo_messaging as om
|
|
||||||
|
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
from watcher.common.messaging.messaging_core import MessagingCore
|
from watcher.common.messaging.messaging_core import MessagingCore
|
||||||
|
from watcher.common.messaging.notification_handler import NotificationHandler
|
||||||
from watcher.common import utils
|
from watcher.common import utils
|
||||||
from watcher.decision_engine.manager import decision_engine_opt_group
|
from watcher.decision_engine.manager import decision_engine_opt_group
|
||||||
from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS
|
from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS
|
||||||
@@ -40,22 +40,16 @@ class DecisionEngineAPI(MessagingCore):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(DecisionEngineAPI, self).__init__(
|
super(DecisionEngineAPI, self).__init__(
|
||||||
CONF.watcher_decision_engine.publisher_id,
|
CONF.watcher_decision_engine.publisher_id,
|
||||||
CONF.watcher_decision_engine.topic_control,
|
CONF.watcher_decision_engine.conductor_topic,
|
||||||
CONF.watcher_decision_engine.topic_status,
|
CONF.watcher_decision_engine.status_topic,
|
||||||
api_version=self.API_VERSION,
|
api_version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
|
self.handler = NotificationHandler(self.publisher_id)
|
||||||
transport = om.get_transport(CONF)
|
self.status_topic_handler.add_endpoint(self.handler)
|
||||||
target = om.Target(
|
|
||||||
topic=CONF.watcher_decision_engine.topic_control,
|
|
||||||
version=self.API_VERSION,
|
|
||||||
)
|
|
||||||
self.client = om.RPCClient(transport, target,
|
|
||||||
serializer=self.serializer)
|
|
||||||
|
|
||||||
def trigger_audit(self, context, audit_uuid=None):
|
def trigger_audit(self, context, audit_uuid=None):
|
||||||
if not utils.is_uuid_like(audit_uuid):
|
if not utils.is_uuid_like(audit_uuid):
|
||||||
raise exception.InvalidUuidOrName(name=audit_uuid)
|
raise exception.InvalidUuidOrName(name=audit_uuid)
|
||||||
|
|
||||||
return self.client.call(
|
return self.conductor_client.call(
|
||||||
context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid)
|
context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid)
|
||||||
|
|||||||
@@ -15,9 +15,8 @@
|
|||||||
# implied.
|
# implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
|
||||||
from mock import call
|
import mock
|
||||||
from mock import MagicMock
|
|
||||||
|
|
||||||
from watcher.applier.action_plan.default import DefaultActionPlanHandler
|
from watcher.applier.action_plan.default import DefaultActionPlanHandler
|
||||||
from watcher.applier.messaging.event_types import EventTypes
|
from watcher.applier.messaging.event_types import EventTypes
|
||||||
@@ -34,7 +33,7 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
|||||||
self.context)
|
self.context)
|
||||||
|
|
||||||
def test_launch_action_plan(self):
|
def test_launch_action_plan(self):
|
||||||
command = DefaultActionPlanHandler(self.context, MagicMock(),
|
command = DefaultActionPlanHandler(self.context, mock.MagicMock(),
|
||||||
self.action_plan.uuid)
|
self.action_plan.uuid)
|
||||||
command.execute()
|
command.execute()
|
||||||
action_plan = ActionPlan.get_by_uuid(self.context,
|
action_plan = ActionPlan.get_by_uuid(self.context,
|
||||||
@@ -42,18 +41,19 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
|||||||
self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state)
|
self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state)
|
||||||
|
|
||||||
def test_trigger_audit_send_notification(self):
|
def test_trigger_audit_send_notification(self):
|
||||||
messaging = MagicMock()
|
messaging = mock.MagicMock()
|
||||||
command = DefaultActionPlanHandler(self.context, messaging,
|
command = DefaultActionPlanHandler(self.context, messaging,
|
||||||
self.action_plan.uuid)
|
self.action_plan.uuid)
|
||||||
command.execute()
|
command.execute()
|
||||||
|
|
||||||
call_on_going = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
call_on_going = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||||
'action_plan_state': ap_objects.State.ONGOING,
|
'action_plan_state': ap_objects.State.ONGOING,
|
||||||
'action_plan__uuid': self.action_plan.uuid})
|
'action_plan__uuid': self.action_plan.uuid})
|
||||||
call_succeeded = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
call_succeeded = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||||
'action_plan_state': ap_objects.State.SUCCEEDED,
|
'action_plan_state': ap_objects.State.SUCCEEDED,
|
||||||
'action_plan__uuid': self.action_plan.uuid})
|
'action_plan__uuid': self.action_plan.uuid})
|
||||||
|
|
||||||
calls = [call_on_going, call_succeeded]
|
calls = [call_on_going, call_succeeded]
|
||||||
messaging.topic_status.publish_event.assert_has_calls(calls)
|
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||||
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
|
self.assertEqual(
|
||||||
|
2, messaging.status_topic_handler.publish_event.call_count)
|
||||||
|
|||||||
@@ -15,59 +15,79 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
from mock import patch
|
import mock
|
||||||
|
|
||||||
from watcher.common.messaging.messaging_core import MessagingCore
|
from watcher.common.messaging import messaging_core
|
||||||
from watcher.common.messaging.messaging_handler import MessagingHandler
|
from watcher.common.messaging import messaging_handler
|
||||||
from watcher.common.rpc import RequestContextSerializer
|
from watcher.common import rpc
|
||||||
from watcher.tests.base import TestCase
|
from watcher.tests import base
|
||||||
|
|
||||||
|
|
||||||
class TestMessagingCore(TestCase):
|
class TestMessagingCore(base.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestMessagingCore, self).setUp()
|
super(TestMessagingCore, self).setUp()
|
||||||
|
|
||||||
def test_build_topic(self):
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
|
def test_connect(self, m_handler):
|
||||||
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
|
messaging.connect()
|
||||||
|
self.assertEqual(m_handler.call_count, 2)
|
||||||
|
|
||||||
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
|
def test_disconnect(self, m_handler):
|
||||||
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
|
messaging.disconnect()
|
||||||
|
self.assertEqual(m_handler.call_count, 2)
|
||||||
|
|
||||||
|
def test_build_topic_handler(self):
|
||||||
topic_name = "MyTopic"
|
topic_name = "MyTopic"
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging_handler = messaging.build_topic(topic_name)
|
handler = messaging.build_topic_handler(topic_name)
|
||||||
self.assertIsNotNone(messaging_handler)
|
self.assertIsNotNone(handler)
|
||||||
|
|
||||||
def test_init_messaging_core(self):
|
def test_init_messaging_core(self):
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
self.assertIsInstance(messaging.serializer,
|
self.assertIsInstance(messaging.serializer,
|
||||||
RequestContextSerializer)
|
rpc.RequestContextSerializer)
|
||||||
self.assertIsInstance(messaging.topic_control, MessagingHandler)
|
self.assertIsInstance(
|
||||||
self.assertIsInstance(messaging.topic_status, MessagingHandler)
|
messaging.conductor_topic_handler,
|
||||||
|
messaging_handler.MessagingHandler)
|
||||||
|
self.assertIsInstance(
|
||||||
|
messaging.status_topic_handler,
|
||||||
|
messaging_handler.MessagingHandler)
|
||||||
|
|
||||||
@patch.object(MessagingCore, 'publish_control')
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
def test_publish_control(self, mock_call):
|
def test_publish_control(self, m_handler_cls):
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler_cls.return_value = m_handler
|
||||||
payload = {
|
payload = {
|
||||||
"name": "value",
|
"name": "value",
|
||||||
}
|
}
|
||||||
event = "MyEvent"
|
event = "MyEvent"
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging.publish_control(event, payload)
|
messaging.publish_control(event, payload)
|
||||||
mock_call.assert_called_once_with(event, payload)
|
m_handler.publish_event.assert_called_once_with(event, payload)
|
||||||
|
|
||||||
@patch.object(MessagingCore, 'publish_status')
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
def test_publish_status(self, mock_call):
|
def test_publish_status(self, m_handler_cls):
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler_cls.return_value = m_handler
|
||||||
payload = {
|
payload = {
|
||||||
"name": "value",
|
"name": "value",
|
||||||
}
|
}
|
||||||
event = "MyEvent"
|
event = "MyEvent"
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging.publish_status(event, payload)
|
messaging.publish_status(event, payload)
|
||||||
mock_call.assert_called_once_with(event, payload)
|
m_handler.publish_event.assert_called_once_with(event, payload, None)
|
||||||
|
|
||||||
@patch.object(MessagingCore, 'publish_status')
|
@mock.patch.object(messaging_core.MessagingCore, 'publish_status')
|
||||||
def test_response(self, mock_call):
|
def test_response(self, mock_call):
|
||||||
event = "My event"
|
event = "My event"
|
||||||
context = {'request_id': 12}
|
context = {'request_id': 12}
|
||||||
message = "My Message"
|
message = "My Message"
|
||||||
|
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging.response(event, context, message)
|
messaging.response(event, context, message)
|
||||||
|
|
||||||
expected_payload = {
|
expected_payload = {
|
||||||
@@ -76,13 +96,15 @@ class TestMessagingCore(TestCase):
|
|||||||
}
|
}
|
||||||
mock_call.assert_called_once_with(event, expected_payload)
|
mock_call.assert_called_once_with(event, expected_payload)
|
||||||
|
|
||||||
def test_messaging_build_topic(self):
|
def test_messaging_build_topic_handler(self):
|
||||||
messaging = MessagingCore("pub_id", "test_topic", "does not matter")
|
messaging = messaging_core.MessagingCore(
|
||||||
topic = messaging.build_topic("test_topic")
|
"pub_id", "test_topic", "does not matter")
|
||||||
|
topic = messaging.build_topic_handler("test_topic")
|
||||||
|
|
||||||
self.assertIsInstance(topic, MessagingHandler)
|
self.assertIsInstance(topic, messaging_handler.MessagingHandler)
|
||||||
self.assertEqual(messaging.publisher_id, "pub_id")
|
self.assertEqual(messaging.publisher_id, "pub_id")
|
||||||
self.assertEqual(topic.publisher_id, "pub_id")
|
self.assertEqual(topic.publisher_id, "pub_id")
|
||||||
|
|
||||||
self.assertEqual(messaging.topic_control.topic_watcher, "test_topic")
|
self.assertEqual(
|
||||||
self.assertEqual(topic.topic_watcher, "test_topic")
|
messaging.conductor_topic_handler.topic_name, "test_topic")
|
||||||
|
self.assertEqual(topic.topic_name, "test_topic")
|
||||||
|
|||||||
@@ -14,17 +14,16 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from mock import Mock
|
import mock
|
||||||
from mock import patch
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
from watcher.common.messaging.messaging_handler import MessagingHandler
|
from watcher.common.messaging import messaging_handler
|
||||||
from watcher.tests.base import TestCase
|
from watcher.tests import base
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
class TestMessagingHandler(TestCase):
|
class TestMessagingHandler(base.TestCase):
|
||||||
|
|
||||||
PUBLISHER_ID = 'TEST_API'
|
PUBLISHER_ID = 'TEST_API'
|
||||||
TOPIC_WATCHER = 'TEST_TOPIC_WATCHER'
|
TOPIC_WATCHER = 'TEST_TOPIC_WATCHER'
|
||||||
@@ -35,20 +34,20 @@ class TestMessagingHandler(TestCase):
|
|||||||
super(TestMessagingHandler, self).setUp()
|
super(TestMessagingHandler, self).setUp()
|
||||||
CONF.set_default('host', 'fake-fqdn')
|
CONF.set_default('host', 'fake-fqdn')
|
||||||
|
|
||||||
@patch.object(messaging, "get_rpc_server")
|
@mock.patch.object(messaging, "get_rpc_server")
|
||||||
@patch.object(messaging, "Target")
|
@mock.patch.object(messaging, "Target")
|
||||||
def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server):
|
def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server):
|
||||||
m_target = Mock()
|
m_target = mock.Mock()
|
||||||
m_target_cls.return_value = m_target
|
m_target_cls.return_value = m_target
|
||||||
messaging_handler = MessagingHandler(
|
handler = messaging_handler.MessagingHandler(
|
||||||
publisher_id=self.PUBLISHER_ID,
|
publisher_id=self.PUBLISHER_ID,
|
||||||
topic_watcher=self.TOPIC_WATCHER,
|
topic_name=self.TOPIC_WATCHER,
|
||||||
endpoint=self.ENDPOINT,
|
endpoint=self.ENDPOINT,
|
||||||
version=self.VERSION,
|
version=self.VERSION,
|
||||||
serializer=None,
|
serializer=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
messaging_handler.run()
|
handler.run()
|
||||||
|
|
||||||
m_target_cls.assert_called_once_with(
|
m_target_cls.assert_called_once_with(
|
||||||
server="fake-fqdn",
|
server="fake-fqdn",
|
||||||
@@ -56,23 +55,23 @@ class TestMessagingHandler(TestCase):
|
|||||||
version="1.0",
|
version="1.0",
|
||||||
)
|
)
|
||||||
m_get_rpc_server.assert_called_once_with(
|
m_get_rpc_server.assert_called_once_with(
|
||||||
messaging_handler.transport,
|
handler.transport,
|
||||||
m_target,
|
m_target,
|
||||||
[self.ENDPOINT],
|
[self.ENDPOINT],
|
||||||
serializer=None,
|
serializer=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_messaging_handler_remove_endpoint(self):
|
def test_messaging_handler_remove_endpoint(self):
|
||||||
messaging_handler = MessagingHandler(
|
handler = messaging_handler.MessagingHandler(
|
||||||
publisher_id=self.PUBLISHER_ID,
|
publisher_id=self.PUBLISHER_ID,
|
||||||
topic_watcher=self.TOPIC_WATCHER,
|
topic_name=self.TOPIC_WATCHER,
|
||||||
endpoint=self.ENDPOINT,
|
endpoint=self.ENDPOINT,
|
||||||
version=self.VERSION,
|
version=self.VERSION,
|
||||||
serializer=None,
|
serializer=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertEqual(messaging_handler.endpoints, [self.ENDPOINT])
|
self.assertEqual(handler.endpoints, [self.ENDPOINT])
|
||||||
|
|
||||||
messaging_handler.remove_endpoint(self.ENDPOINT)
|
handler.remove_endpoint(self.ENDPOINT)
|
||||||
|
|
||||||
self.assertEqual(messaging_handler.endpoints, [])
|
self.assertEqual(handler.endpoints, [])
|
||||||
|
|||||||
@@ -63,5 +63,6 @@ class TestDefaultAuditHandler(base.DbTestCase):
|
|||||||
'audit_uuid': self.audit.uuid})
|
'audit_uuid': self.audit.uuid})
|
||||||
|
|
||||||
calls = [call_on_going, call_succeeded]
|
calls = [call_on_going, call_succeeded]
|
||||||
messaging.topic_status.publish_event.assert_has_calls(calls)
|
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||||
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
|
self.assertEqual(
|
||||||
|
2, messaging.status_topic_handler.publish_event.call_count)
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ class MyObj2(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TestSubclassedObject(MyObj):
|
class DummySubclassedObject(MyObj):
|
||||||
fields = {'new_field': str}
|
fields = {'new_field': str}
|
||||||
|
|
||||||
|
|
||||||
@@ -438,13 +438,13 @@ class _TestObject(object):
|
|||||||
base_fields = base.WatcherObject.fields.keys()
|
base_fields = base.WatcherObject.fields.keys()
|
||||||
myobj_fields = ['foo', 'bar', 'missing'] + base_fields
|
myobj_fields = ['foo', 'bar', 'missing'] + base_fields
|
||||||
myobj3_fields = ['new_field']
|
myobj3_fields = ['new_field']
|
||||||
self.assertTrue(issubclass(TestSubclassedObject, MyObj))
|
self.assertTrue(issubclass(DummySubclassedObject, MyObj))
|
||||||
self.assertEqual(len(myobj_fields), len(MyObj.fields))
|
self.assertEqual(len(myobj_fields), len(MyObj.fields))
|
||||||
self.assertEqual(set(myobj_fields), set(MyObj.fields.keys()))
|
self.assertEqual(set(myobj_fields), set(MyObj.fields.keys()))
|
||||||
self.assertEqual(len(myobj_fields) + len(myobj3_fields),
|
self.assertEqual(len(myobj_fields) + len(myobj3_fields),
|
||||||
len(TestSubclassedObject.fields))
|
len(DummySubclassedObject.fields))
|
||||||
self.assertEqual(set(myobj_fields) | set(myobj3_fields),
|
self.assertEqual(set(myobj_fields) | set(myobj3_fields),
|
||||||
set(TestSubclassedObject.fields.keys()))
|
set(DummySubclassedObject.fields.keys()))
|
||||||
|
|
||||||
def test_get_changes(self):
|
def test_get_changes(self):
|
||||||
obj = MyObj(self.context)
|
obj = MyObj(self.context)
|
||||||
|
|||||||
Reference in New Issue
Block a user