Define self.client in MessagingCore
Currently self.client is referenced within MessagingCore, but no definition is made in its constructor. Additionally self.client is defined in children classes of MessagingCore. This patchset defines self.client in the constructor of MessagingCore and removes the redefinition in its children. -self.client lazily loaded Co-Authored-By: v-francoise <Vincent.FRANCOISE@b-com.com> Change-Id: I14525a175bf1ebde3d2636024ad2f2219c79d6e1 Closes-Bug: #1521636
This commit is contained in:
committed by
Vincent Françoise
parent
7406a1e713
commit
2f0c1c12cf
@@ -43,8 +43,8 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
|||||||
ev.data = {}
|
ev.data = {}
|
||||||
payload = {'action_plan__uuid': uuid,
|
payload = {'action_plan__uuid': uuid,
|
||||||
'action_plan_state': state}
|
'action_plan_state': state}
|
||||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
self.applier_manager.status_topic_handler.publish_event(
|
||||||
payload)
|
ev.type.name, payload)
|
||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -34,12 +34,12 @@ APPLIER_MANAGER_OPTS = [
|
|||||||
min=1,
|
min=1,
|
||||||
required=True,
|
required=True,
|
||||||
help='Number of workers for applier, default value is 1.'),
|
help='Number of workers for applier, default value is 1.'),
|
||||||
cfg.StrOpt('topic_control',
|
cfg.StrOpt('conductor_topic',
|
||||||
default='watcher.applier.control',
|
default='watcher.applier.control',
|
||||||
help='The topic name used for'
|
help='The topic name used for'
|
||||||
'control events, this topic '
|
'control events, this topic '
|
||||||
'used for rpc call '),
|
'used for rpc call '),
|
||||||
cfg.StrOpt('topic_status',
|
cfg.StrOpt('status_topic',
|
||||||
default='watcher.applier.status',
|
default='watcher.applier.status',
|
||||||
help='The topic name used for '
|
help='The topic name used for '
|
||||||
'status events, this topic '
|
'status events, this topic '
|
||||||
@@ -67,12 +67,13 @@ class ApplierManager(messaging_core.MessagingCore):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(ApplierManager, self).__init__(
|
super(ApplierManager, self).__init__(
|
||||||
CONF.watcher_applier.publisher_id,
|
CONF.watcher_applier.publisher_id,
|
||||||
CONF.watcher_applier.topic_control,
|
CONF.watcher_applier.conductor_topic,
|
||||||
CONF.watcher_applier.topic_status,
|
CONF.watcher_applier.status_topic,
|
||||||
api_version=self.API_VERSION,
|
api_version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
self.topic_control.add_endpoint(trigger.TriggerActionPlan(self))
|
self.conductor_topic_handler.add_endpoint(
|
||||||
|
trigger.TriggerActionPlan(self))
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
self.topic_control.join()
|
self.conductor_topic_handler.join()
|
||||||
self.topic_status.join()
|
self.status_topic_handler.join()
|
||||||
|
|||||||
@@ -39,17 +39,17 @@ class ApplierAPI(messaging_core.MessagingCore):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(ApplierAPI, self).__init__(
|
super(ApplierAPI, self).__init__(
|
||||||
CONF.watcher_applier.publisher_id,
|
CONF.watcher_applier.publisher_id,
|
||||||
CONF.watcher_applier.topic_control,
|
CONF.watcher_applier.conductor_topic,
|
||||||
CONF.watcher_applier.topic_status,
|
CONF.watcher_applier.status_topic,
|
||||||
api_version=self.API_VERSION,
|
api_version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
self.handler = notification.NotificationHandler(self.publisher_id)
|
self.handler = notification.NotificationHandler(self.publisher_id)
|
||||||
self.handler.register_observer(self)
|
self.handler.register_observer(self)
|
||||||
self.topic_status.add_endpoint(self.handler)
|
self.status_topic_handler.add_endpoint(self.handler)
|
||||||
transport = om.get_transport(CONF)
|
transport = om.get_transport(CONF)
|
||||||
|
|
||||||
target = om.Target(
|
target = om.Target(
|
||||||
topic=CONF.watcher_applier.topic_control,
|
topic=CONF.watcher_applier.conductor_topic,
|
||||||
version=self.API_VERSION,
|
version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -62,8 +62,8 @@ class BaseWorkFlowEngine(object):
|
|||||||
ev.data = {}
|
ev.data = {}
|
||||||
payload = {'action_uuid': action.uuid,
|
payload = {'action_uuid': action.uuid,
|
||||||
'action_state': state}
|
'action_state': state}
|
||||||
self.applier_manager.topic_status.publish_event(ev.type.name,
|
self.applier_manager.status_topic_handler.publish_event(
|
||||||
payload)
|
ev.type.name, payload)
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def execute(self, actions):
|
def execute(self, actions):
|
||||||
|
|||||||
@@ -16,58 +16,100 @@
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from watcher.common.messaging.events.event_dispatcher import \
|
import oslo_messaging as om
|
||||||
EventDispatcher
|
|
||||||
from watcher.common.messaging.messaging_handler import \
|
|
||||||
MessagingHandler
|
|
||||||
from watcher.common.rpc import RequestContextSerializer
|
|
||||||
|
|
||||||
from watcher.objects.base import WatcherObjectSerializer
|
from watcher.common.messaging.events import event_dispatcher as dispatcher
|
||||||
|
from watcher.common.messaging import messaging_handler
|
||||||
|
from watcher.common import rpc
|
||||||
|
|
||||||
|
from watcher.objects import base
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
class MessagingCore(EventDispatcher):
|
class MessagingCore(dispatcher.EventDispatcher):
|
||||||
|
|
||||||
API_VERSION = '1.0'
|
API_VERSION = '1.0'
|
||||||
|
|
||||||
def __init__(self, publisher_id, topic_control, topic_status,
|
def __init__(self, publisher_id, conductor_topic, status_topic,
|
||||||
api_version=API_VERSION):
|
api_version=API_VERSION):
|
||||||
super(MessagingCore, self).__init__()
|
super(MessagingCore, self).__init__()
|
||||||
self.serializer = RequestContextSerializer(WatcherObjectSerializer())
|
self.serializer = rpc.RequestContextSerializer(
|
||||||
|
base.WatcherObjectSerializer())
|
||||||
self.publisher_id = publisher_id
|
self.publisher_id = publisher_id
|
||||||
self.api_version = api_version
|
self.api_version = api_version
|
||||||
self.topic_control = self.build_topic(topic_control)
|
|
||||||
self.topic_status = self.build_topic(topic_status)
|
|
||||||
|
|
||||||
def build_topic(self, topic_name):
|
self.conductor_topic = conductor_topic
|
||||||
return MessagingHandler(self.publisher_id, topic_name, self,
|
self.status_topic = status_topic
|
||||||
self.api_version, self.serializer)
|
self.conductor_topic_handler = self.build_topic_handler(
|
||||||
|
conductor_topic)
|
||||||
|
self.status_topic_handler = self.build_topic_handler(status_topic)
|
||||||
|
|
||||||
|
self._conductor_client = None
|
||||||
|
self._status_client = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def conductor_client(self):
|
||||||
|
if self._conductor_client is None:
|
||||||
|
transport = om.get_transport(CONF)
|
||||||
|
target = om.Target(
|
||||||
|
topic=self.conductor_topic,
|
||||||
|
version=self.API_VERSION,
|
||||||
|
)
|
||||||
|
self._conductor_client = om.RPCClient(
|
||||||
|
transport, target, serializer=self.serializer)
|
||||||
|
return self._conductor_client
|
||||||
|
|
||||||
|
@conductor_client.setter
|
||||||
|
def conductor_client(self, c):
|
||||||
|
self.conductor_client = c
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status_client(self):
|
||||||
|
if self._status_client is None:
|
||||||
|
transport = om.get_transport(CONF)
|
||||||
|
target = om.Target(
|
||||||
|
topic=self.status_topic,
|
||||||
|
version=self.API_VERSION,
|
||||||
|
)
|
||||||
|
self._status_client = om.RPCClient(
|
||||||
|
transport, target, serializer=self.serializer)
|
||||||
|
return self._status_client
|
||||||
|
|
||||||
|
@status_client.setter
|
||||||
|
def status_client(self, c):
|
||||||
|
self.status_client = c
|
||||||
|
|
||||||
|
def build_topic_handler(self, topic_name):
|
||||||
|
return messaging_handler.MessagingHandler(
|
||||||
|
self.publisher_id, topic_name, self,
|
||||||
|
self.api_version, self.serializer)
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
LOG.debug("Connecting to '%s' (%s)",
|
LOG.debug("Connecting to '%s' (%s)",
|
||||||
CONF.transport_url, CONF.rpc_backend)
|
CONF.transport_url, CONF.rpc_backend)
|
||||||
self.topic_control.start()
|
self.conductor_topic_handler.start()
|
||||||
self.topic_status.start()
|
self.status_topic_handler.start()
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
LOG.debug("Disconnecting from '%s' (%s)",
|
LOG.debug("Disconnecting from '%s' (%s)",
|
||||||
CONF.transport_url, CONF.rpc_backend)
|
CONF.transport_url, CONF.rpc_backend)
|
||||||
self.topic_control.stop()
|
self.conductor_topic_handler.stop()
|
||||||
self.topic_status.stop()
|
self.status_topic_handler.stop()
|
||||||
|
|
||||||
def publish_control(self, event, payload):
|
def publish_control(self, event, payload):
|
||||||
return self.topic_control.publish_event(event, payload)
|
return self.conductor_topic_handler.publish_event(event, payload)
|
||||||
|
|
||||||
def publish_status(self, event, payload, request_id=None):
|
def publish_status(self, event, payload, request_id=None):
|
||||||
return self.topic_status.publish_event(event, payload, request_id)
|
return self.status_topic_handler.publish_event(
|
||||||
|
event, payload, request_id)
|
||||||
|
|
||||||
def get_version(self):
|
def get_version(self):
|
||||||
return self.api_version
|
return self.api_version
|
||||||
|
|
||||||
def check_api_version(self, context):
|
def check_api_version(self, context):
|
||||||
api_manager_version = self.client.call(
|
api_manager_version = self.conductor_client.call(
|
||||||
context.to_dict(), 'check_api_version',
|
context.to_dict(), 'check_api_version',
|
||||||
api_version=self.api_version)
|
api_version=self.api_version)
|
||||||
return api_manager_version
|
return api_manager_version
|
||||||
|
|||||||
@@ -38,11 +38,11 @@ CONF = cfg.CONF
|
|||||||
|
|
||||||
class MessagingHandler(threading.Thread):
|
class MessagingHandler(threading.Thread):
|
||||||
|
|
||||||
def __init__(self, publisher_id, topic_watcher, endpoint, version,
|
def __init__(self, publisher_id, topic_name, endpoint, version,
|
||||||
serializer=None):
|
serializer=None):
|
||||||
super(MessagingHandler, self).__init__()
|
super(MessagingHandler, self).__init__()
|
||||||
self.publisher_id = publisher_id
|
self.publisher_id = publisher_id
|
||||||
self.topic_watcher = topic_watcher
|
self.topic_name = topic_name
|
||||||
self.__endpoints = []
|
self.__endpoints = []
|
||||||
self.__serializer = serializer
|
self.__serializer = serializer
|
||||||
self.__version = version
|
self.__version = version
|
||||||
@@ -72,7 +72,7 @@ class MessagingHandler(threading.Thread):
|
|||||||
return om.Notifier(
|
return om.Notifier(
|
||||||
self.__transport,
|
self.__transport,
|
||||||
publisher_id=self.publisher_id,
|
publisher_id=self.publisher_id,
|
||||||
topic=self.topic_watcher,
|
topic=self.topic_name,
|
||||||
serializer=serializer
|
serializer=serializer
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -87,7 +87,7 @@ class MessagingHandler(threading.Thread):
|
|||||||
self.__notifier = self.build_notifier()
|
self.__notifier = self.build_notifier()
|
||||||
if len(self.__endpoints):
|
if len(self.__endpoints):
|
||||||
target = om.Target(
|
target = om.Target(
|
||||||
topic=self.topic_watcher,
|
topic=self.topic_name,
|
||||||
# For compatibility, we can override it with 'host' opt
|
# For compatibility, we can override it with 'host' opt
|
||||||
server=CONF.host or socket.getfqdn(),
|
server=CONF.host or socket.getfqdn(),
|
||||||
version=self.__version,
|
version=self.__version,
|
||||||
@@ -101,7 +101,7 @@ class MessagingHandler(threading.Thread):
|
|||||||
LOG.error(_LE("Messaging configuration error"))
|
LOG.error(_LE("Messaging configuration error"))
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
LOG.debug("configure MessagingHandler for %s" % self.topic_watcher)
|
LOG.debug("configure MessagingHandler for %s" % self.topic_name)
|
||||||
self._configure()
|
self._configure()
|
||||||
if len(self.__endpoints) > 0:
|
if len(self.__endpoints) > 0:
|
||||||
LOG.debug("Starting up server")
|
LOG.debug("Starting up server")
|
||||||
|
|||||||
@@ -18,17 +18,16 @@ import eventlet
|
|||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
|
|
||||||
from watcher.common.messaging.utils.observable import \
|
from watcher.common.messaging.utils import observable
|
||||||
Observable
|
|
||||||
|
|
||||||
|
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class NotificationHandler(Observable):
|
class NotificationHandler(observable.Observable):
|
||||||
def __init__(self, publisher_id):
|
def __init__(self, publisher_id):
|
||||||
Observable.__init__(self)
|
super(NotificationHandler, self).__init__()
|
||||||
self.publisher_id = publisher_id
|
self.publisher_id = publisher_id
|
||||||
|
|
||||||
def info(self, ctx, publisher_id, event_type, payload, metadata):
|
def info(self, ctx, publisher_id, event_type, payload, metadata):
|
||||||
|
|||||||
@@ -14,19 +14,14 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from oslo_log import log
|
from watcher.common.messaging.utils import synchronization
|
||||||
|
|
||||||
from watcher.common.messaging.utils.synchronization import \
|
|
||||||
Synchronization
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Observable(Synchronization):
|
class Observable(synchronization.Synchronization):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
super(Observable, self).__init__()
|
||||||
self.__observers = []
|
self.__observers = []
|
||||||
self.changed = 0
|
self.changed = 0
|
||||||
Synchronization.__init__(self)
|
|
||||||
|
|
||||||
def set_changed(self):
|
def set_changed(self):
|
||||||
self.changed = 1
|
self.changed = 1
|
||||||
|
|||||||
@@ -54,8 +54,8 @@ class DefaultAuditHandler(base.BaseAuditHandler):
|
|||||||
event.data = {}
|
event.data = {}
|
||||||
payload = {'audit_uuid': audit_uuid,
|
payload = {'audit_uuid': audit_uuid,
|
||||||
'audit_status': status}
|
'audit_status': status}
|
||||||
self.messaging.topic_status.publish_event(event.type.name,
|
self.messaging.status_topic_handler.publish_event(
|
||||||
payload)
|
event.type.name, payload)
|
||||||
|
|
||||||
def update_audit_state(self, request_context, audit_uuid, state):
|
def update_audit_state(self, request_context, audit_uuid, state):
|
||||||
LOG.debug("Update audit state: %s", state)
|
LOG.debug("Update audit state: %s", state)
|
||||||
|
|||||||
@@ -40,20 +40,20 @@ See :doc:`../architecture` for more details on this component.
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
from watcher.common.messaging.messaging_core import MessagingCore
|
from watcher.common.messaging import messaging_core
|
||||||
from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint
|
from watcher.decision_engine.messaging import audit_endpoint
|
||||||
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
WATCHER_DECISION_ENGINE_OPTS = [
|
WATCHER_DECISION_ENGINE_OPTS = [
|
||||||
cfg.StrOpt('topic_control',
|
cfg.StrOpt('conductor_topic',
|
||||||
default='watcher.decision.control',
|
default='watcher.decision.control',
|
||||||
help='The topic name used for'
|
help='The topic name used for'
|
||||||
'control events, this topic '
|
'control events, this topic '
|
||||||
'used for rpc call '),
|
'used for rpc call '),
|
||||||
cfg.StrOpt('topic_status',
|
cfg.StrOpt('status_topic',
|
||||||
default='watcher.decision.status',
|
default='watcher.decision.status',
|
||||||
help='The topic name used for '
|
help='The topic name used for '
|
||||||
'status events, this topic '
|
'status events, this topic '
|
||||||
@@ -78,18 +78,18 @@ CONF.register_group(decision_engine_opt_group)
|
|||||||
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
|
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
|
||||||
|
|
||||||
|
|
||||||
class DecisionEngineManager(MessagingCore):
|
class DecisionEngineManager(messaging_core.MessagingCore):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(DecisionEngineManager, self).__init__(
|
super(DecisionEngineManager, self).__init__(
|
||||||
CONF.watcher_decision_engine.publisher_id,
|
CONF.watcher_decision_engine.publisher_id,
|
||||||
CONF.watcher_decision_engine.topic_control,
|
CONF.watcher_decision_engine.conductor_topic,
|
||||||
CONF.watcher_decision_engine.topic_status,
|
CONF.watcher_decision_engine.status_topic,
|
||||||
api_version=self.API_VERSION)
|
api_version=self.API_VERSION)
|
||||||
endpoint = AuditEndpoint(self,
|
endpoint = audit_endpoint.AuditEndpoint(
|
||||||
max_workers=CONF.watcher_decision_engine.
|
self,
|
||||||
max_workers)
|
max_workers=CONF.watcher_decision_engine.max_workers)
|
||||||
self.topic_control.add_endpoint(endpoint)
|
self.conductor_topic_handler.add_endpoint(endpoint)
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
self.topic_control.join()
|
self.conductor_topic_handler.join()
|
||||||
self.topic_status.join()
|
self.status_topic_handler.join()
|
||||||
|
|||||||
@@ -19,10 +19,10 @@
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import oslo_messaging as om
|
|
||||||
|
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
from watcher.common.messaging.messaging_core import MessagingCore
|
from watcher.common.messaging.messaging_core import MessagingCore
|
||||||
|
from watcher.common.messaging.notification_handler import NotificationHandler
|
||||||
from watcher.common import utils
|
from watcher.common import utils
|
||||||
from watcher.decision_engine.manager import decision_engine_opt_group
|
from watcher.decision_engine.manager import decision_engine_opt_group
|
||||||
from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS
|
from watcher.decision_engine.manager import WATCHER_DECISION_ENGINE_OPTS
|
||||||
@@ -40,22 +40,16 @@ class DecisionEngineAPI(MessagingCore):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(DecisionEngineAPI, self).__init__(
|
super(DecisionEngineAPI, self).__init__(
|
||||||
CONF.watcher_decision_engine.publisher_id,
|
CONF.watcher_decision_engine.publisher_id,
|
||||||
CONF.watcher_decision_engine.topic_control,
|
CONF.watcher_decision_engine.conductor_topic,
|
||||||
CONF.watcher_decision_engine.topic_status,
|
CONF.watcher_decision_engine.status_topic,
|
||||||
api_version=self.API_VERSION,
|
api_version=self.API_VERSION,
|
||||||
)
|
)
|
||||||
|
self.handler = NotificationHandler(self.publisher_id)
|
||||||
transport = om.get_transport(CONF)
|
self.status_topic_handler.add_endpoint(self.handler)
|
||||||
target = om.Target(
|
|
||||||
topic=CONF.watcher_decision_engine.topic_control,
|
|
||||||
version=self.API_VERSION,
|
|
||||||
)
|
|
||||||
self.client = om.RPCClient(transport, target,
|
|
||||||
serializer=self.serializer)
|
|
||||||
|
|
||||||
def trigger_audit(self, context, audit_uuid=None):
|
def trigger_audit(self, context, audit_uuid=None):
|
||||||
if not utils.is_uuid_like(audit_uuid):
|
if not utils.is_uuid_like(audit_uuid):
|
||||||
raise exception.InvalidUuidOrName(name=audit_uuid)
|
raise exception.InvalidUuidOrName(name=audit_uuid)
|
||||||
|
|
||||||
return self.client.call(
|
return self.conductor_client.call(
|
||||||
context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid)
|
context.to_dict(), 'trigger_audit', audit_uuid=audit_uuid)
|
||||||
|
|||||||
@@ -15,9 +15,8 @@
|
|||||||
# implied.
|
# implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
|
||||||
from mock import call
|
import mock
|
||||||
from mock import MagicMock
|
|
||||||
|
|
||||||
from watcher.applier.action_plan.default import DefaultActionPlanHandler
|
from watcher.applier.action_plan.default import DefaultActionPlanHandler
|
||||||
from watcher.applier.messaging.event_types import EventTypes
|
from watcher.applier.messaging.event_types import EventTypes
|
||||||
@@ -34,7 +33,7 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
|||||||
self.context)
|
self.context)
|
||||||
|
|
||||||
def test_launch_action_plan(self):
|
def test_launch_action_plan(self):
|
||||||
command = DefaultActionPlanHandler(self.context, MagicMock(),
|
command = DefaultActionPlanHandler(self.context, mock.MagicMock(),
|
||||||
self.action_plan.uuid)
|
self.action_plan.uuid)
|
||||||
command.execute()
|
command.execute()
|
||||||
action_plan = ActionPlan.get_by_uuid(self.context,
|
action_plan = ActionPlan.get_by_uuid(self.context,
|
||||||
@@ -42,18 +41,19 @@ class TestDefaultActionPlanHandler(DbTestCase):
|
|||||||
self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state)
|
self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state)
|
||||||
|
|
||||||
def test_trigger_audit_send_notification(self):
|
def test_trigger_audit_send_notification(self):
|
||||||
messaging = MagicMock()
|
messaging = mock.MagicMock()
|
||||||
command = DefaultActionPlanHandler(self.context, messaging,
|
command = DefaultActionPlanHandler(self.context, messaging,
|
||||||
self.action_plan.uuid)
|
self.action_plan.uuid)
|
||||||
command.execute()
|
command.execute()
|
||||||
|
|
||||||
call_on_going = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
call_on_going = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||||
'action_plan_state': ap_objects.State.ONGOING,
|
'action_plan_state': ap_objects.State.ONGOING,
|
||||||
'action_plan__uuid': self.action_plan.uuid})
|
'action_plan__uuid': self.action_plan.uuid})
|
||||||
call_succeeded = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
call_succeeded = mock.call(EventTypes.LAUNCH_ACTION_PLAN.name, {
|
||||||
'action_plan_state': ap_objects.State.SUCCEEDED,
|
'action_plan_state': ap_objects.State.SUCCEEDED,
|
||||||
'action_plan__uuid': self.action_plan.uuid})
|
'action_plan__uuid': self.action_plan.uuid})
|
||||||
|
|
||||||
calls = [call_on_going, call_succeeded]
|
calls = [call_on_going, call_succeeded]
|
||||||
messaging.topic_status.publish_event.assert_has_calls(calls)
|
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||||
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
|
self.assertEqual(
|
||||||
|
2, messaging.status_topic_handler.publish_event.call_count)
|
||||||
|
|||||||
@@ -15,59 +15,79 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
from mock import patch
|
import mock
|
||||||
|
|
||||||
from watcher.common.messaging.messaging_core import MessagingCore
|
from watcher.common.messaging import messaging_core
|
||||||
from watcher.common.messaging.messaging_handler import MessagingHandler
|
from watcher.common.messaging import messaging_handler
|
||||||
from watcher.common.rpc import RequestContextSerializer
|
from watcher.common import rpc
|
||||||
from watcher.tests.base import TestCase
|
from watcher.tests import base
|
||||||
|
|
||||||
|
|
||||||
class TestMessagingCore(TestCase):
|
class TestMessagingCore(base.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestMessagingCore, self).setUp()
|
super(TestMessagingCore, self).setUp()
|
||||||
|
|
||||||
def test_build_topic(self):
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
|
def test_connect(self, m_handler):
|
||||||
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
|
messaging.connect()
|
||||||
|
self.assertEqual(m_handler.call_count, 2)
|
||||||
|
|
||||||
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
|
def test_disconnect(self, m_handler):
|
||||||
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
|
messaging.disconnect()
|
||||||
|
self.assertEqual(m_handler.call_count, 2)
|
||||||
|
|
||||||
|
def test_build_topic_handler(self):
|
||||||
topic_name = "MyTopic"
|
topic_name = "MyTopic"
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging_handler = messaging.build_topic(topic_name)
|
handler = messaging.build_topic_handler(topic_name)
|
||||||
self.assertIsNotNone(messaging_handler)
|
self.assertIsNotNone(handler)
|
||||||
|
|
||||||
def test_init_messaging_core(self):
|
def test_init_messaging_core(self):
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
self.assertIsInstance(messaging.serializer,
|
self.assertIsInstance(messaging.serializer,
|
||||||
RequestContextSerializer)
|
rpc.RequestContextSerializer)
|
||||||
self.assertIsInstance(messaging.topic_control, MessagingHandler)
|
self.assertIsInstance(
|
||||||
self.assertIsInstance(messaging.topic_status, MessagingHandler)
|
messaging.conductor_topic_handler,
|
||||||
|
messaging_handler.MessagingHandler)
|
||||||
|
self.assertIsInstance(
|
||||||
|
messaging.status_topic_handler,
|
||||||
|
messaging_handler.MessagingHandler)
|
||||||
|
|
||||||
@patch.object(MessagingCore, 'publish_control')
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
def test_publish_control(self, mock_call):
|
def test_publish_control(self, m_handler_cls):
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler_cls.return_value = m_handler
|
||||||
payload = {
|
payload = {
|
||||||
"name": "value",
|
"name": "value",
|
||||||
}
|
}
|
||||||
event = "MyEvent"
|
event = "MyEvent"
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging.publish_control(event, payload)
|
messaging.publish_control(event, payload)
|
||||||
mock_call.assert_called_once_with(event, payload)
|
m_handler.publish_event.assert_called_once_with(event, payload)
|
||||||
|
|
||||||
@patch.object(MessagingCore, 'publish_status')
|
@mock.patch.object(messaging_handler, "MessagingHandler")
|
||||||
def test_publish_status(self, mock_call):
|
def test_publish_status(self, m_handler_cls):
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler_cls.return_value = m_handler
|
||||||
payload = {
|
payload = {
|
||||||
"name": "value",
|
"name": "value",
|
||||||
}
|
}
|
||||||
event = "MyEvent"
|
event = "MyEvent"
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging.publish_status(event, payload)
|
messaging.publish_status(event, payload)
|
||||||
mock_call.assert_called_once_with(event, payload)
|
m_handler.publish_event.assert_called_once_with(event, payload, None)
|
||||||
|
|
||||||
@patch.object(MessagingCore, 'publish_status')
|
@mock.patch.object(messaging_core.MessagingCore, 'publish_status')
|
||||||
def test_response(self, mock_call):
|
def test_response(self, mock_call):
|
||||||
event = "My event"
|
event = "My event"
|
||||||
context = {'request_id': 12}
|
context = {'request_id': 12}
|
||||||
message = "My Message"
|
message = "My Message"
|
||||||
|
|
||||||
messaging = MessagingCore("", "", "")
|
messaging = messaging_core.MessagingCore("", "", "")
|
||||||
messaging.response(event, context, message)
|
messaging.response(event, context, message)
|
||||||
|
|
||||||
expected_payload = {
|
expected_payload = {
|
||||||
@@ -76,13 +96,15 @@ class TestMessagingCore(TestCase):
|
|||||||
}
|
}
|
||||||
mock_call.assert_called_once_with(event, expected_payload)
|
mock_call.assert_called_once_with(event, expected_payload)
|
||||||
|
|
||||||
def test_messaging_build_topic(self):
|
def test_messaging_build_topic_handler(self):
|
||||||
messaging = MessagingCore("pub_id", "test_topic", "does not matter")
|
messaging = messaging_core.MessagingCore(
|
||||||
topic = messaging.build_topic("test_topic")
|
"pub_id", "test_topic", "does not matter")
|
||||||
|
topic = messaging.build_topic_handler("test_topic")
|
||||||
|
|
||||||
self.assertIsInstance(topic, MessagingHandler)
|
self.assertIsInstance(topic, messaging_handler.MessagingHandler)
|
||||||
self.assertEqual(messaging.publisher_id, "pub_id")
|
self.assertEqual(messaging.publisher_id, "pub_id")
|
||||||
self.assertEqual(topic.publisher_id, "pub_id")
|
self.assertEqual(topic.publisher_id, "pub_id")
|
||||||
|
|
||||||
self.assertEqual(messaging.topic_control.topic_watcher, "test_topic")
|
self.assertEqual(
|
||||||
self.assertEqual(topic.topic_watcher, "test_topic")
|
messaging.conductor_topic_handler.topic_name, "test_topic")
|
||||||
|
self.assertEqual(topic.topic_name, "test_topic")
|
||||||
|
|||||||
@@ -14,17 +14,16 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from mock import Mock
|
import mock
|
||||||
from mock import patch
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
from watcher.common.messaging.messaging_handler import MessagingHandler
|
from watcher.common.messaging import messaging_handler
|
||||||
from watcher.tests.base import TestCase
|
from watcher.tests import base
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
class TestMessagingHandler(TestCase):
|
class TestMessagingHandler(base.TestCase):
|
||||||
|
|
||||||
PUBLISHER_ID = 'TEST_API'
|
PUBLISHER_ID = 'TEST_API'
|
||||||
TOPIC_WATCHER = 'TEST_TOPIC_WATCHER'
|
TOPIC_WATCHER = 'TEST_TOPIC_WATCHER'
|
||||||
@@ -35,20 +34,20 @@ class TestMessagingHandler(TestCase):
|
|||||||
super(TestMessagingHandler, self).setUp()
|
super(TestMessagingHandler, self).setUp()
|
||||||
CONF.set_default('host', 'fake-fqdn')
|
CONF.set_default('host', 'fake-fqdn')
|
||||||
|
|
||||||
@patch.object(messaging, "get_rpc_server")
|
@mock.patch.object(messaging, "get_rpc_server")
|
||||||
@patch.object(messaging, "Target")
|
@mock.patch.object(messaging, "Target")
|
||||||
def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server):
|
def test_setup_messaging_handler(self, m_target_cls, m_get_rpc_server):
|
||||||
m_target = Mock()
|
m_target = mock.Mock()
|
||||||
m_target_cls.return_value = m_target
|
m_target_cls.return_value = m_target
|
||||||
messaging_handler = MessagingHandler(
|
handler = messaging_handler.MessagingHandler(
|
||||||
publisher_id=self.PUBLISHER_ID,
|
publisher_id=self.PUBLISHER_ID,
|
||||||
topic_watcher=self.TOPIC_WATCHER,
|
topic_name=self.TOPIC_WATCHER,
|
||||||
endpoint=self.ENDPOINT,
|
endpoint=self.ENDPOINT,
|
||||||
version=self.VERSION,
|
version=self.VERSION,
|
||||||
serializer=None,
|
serializer=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
messaging_handler.run()
|
handler.run()
|
||||||
|
|
||||||
m_target_cls.assert_called_once_with(
|
m_target_cls.assert_called_once_with(
|
||||||
server="fake-fqdn",
|
server="fake-fqdn",
|
||||||
@@ -56,23 +55,23 @@ class TestMessagingHandler(TestCase):
|
|||||||
version="1.0",
|
version="1.0",
|
||||||
)
|
)
|
||||||
m_get_rpc_server.assert_called_once_with(
|
m_get_rpc_server.assert_called_once_with(
|
||||||
messaging_handler.transport,
|
handler.transport,
|
||||||
m_target,
|
m_target,
|
||||||
[self.ENDPOINT],
|
[self.ENDPOINT],
|
||||||
serializer=None,
|
serializer=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_messaging_handler_remove_endpoint(self):
|
def test_messaging_handler_remove_endpoint(self):
|
||||||
messaging_handler = MessagingHandler(
|
handler = messaging_handler.MessagingHandler(
|
||||||
publisher_id=self.PUBLISHER_ID,
|
publisher_id=self.PUBLISHER_ID,
|
||||||
topic_watcher=self.TOPIC_WATCHER,
|
topic_name=self.TOPIC_WATCHER,
|
||||||
endpoint=self.ENDPOINT,
|
endpoint=self.ENDPOINT,
|
||||||
version=self.VERSION,
|
version=self.VERSION,
|
||||||
serializer=None,
|
serializer=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertEqual(messaging_handler.endpoints, [self.ENDPOINT])
|
self.assertEqual(handler.endpoints, [self.ENDPOINT])
|
||||||
|
|
||||||
messaging_handler.remove_endpoint(self.ENDPOINT)
|
handler.remove_endpoint(self.ENDPOINT)
|
||||||
|
|
||||||
self.assertEqual(messaging_handler.endpoints, [])
|
self.assertEqual(handler.endpoints, [])
|
||||||
|
|||||||
@@ -63,5 +63,6 @@ class TestDefaultAuditHandler(base.DbTestCase):
|
|||||||
'audit_uuid': self.audit.uuid})
|
'audit_uuid': self.audit.uuid})
|
||||||
|
|
||||||
calls = [call_on_going, call_succeeded]
|
calls = [call_on_going, call_succeeded]
|
||||||
messaging.topic_status.publish_event.assert_has_calls(calls)
|
messaging.status_topic_handler.publish_event.assert_has_calls(calls)
|
||||||
self.assertEqual(2, messaging.topic_status.publish_event.call_count)
|
self.assertEqual(
|
||||||
|
2, messaging.status_topic_handler.publish_event.call_count)
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ class MyObj2(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TestSubclassedObject(MyObj):
|
class DummySubclassedObject(MyObj):
|
||||||
fields = {'new_field': str}
|
fields = {'new_field': str}
|
||||||
|
|
||||||
|
|
||||||
@@ -438,13 +438,13 @@ class _TestObject(object):
|
|||||||
base_fields = base.WatcherObject.fields.keys()
|
base_fields = base.WatcherObject.fields.keys()
|
||||||
myobj_fields = ['foo', 'bar', 'missing'] + base_fields
|
myobj_fields = ['foo', 'bar', 'missing'] + base_fields
|
||||||
myobj3_fields = ['new_field']
|
myobj3_fields = ['new_field']
|
||||||
self.assertTrue(issubclass(TestSubclassedObject, MyObj))
|
self.assertTrue(issubclass(DummySubclassedObject, MyObj))
|
||||||
self.assertEqual(len(myobj_fields), len(MyObj.fields))
|
self.assertEqual(len(myobj_fields), len(MyObj.fields))
|
||||||
self.assertEqual(set(myobj_fields), set(MyObj.fields.keys()))
|
self.assertEqual(set(myobj_fields), set(MyObj.fields.keys()))
|
||||||
self.assertEqual(len(myobj_fields) + len(myobj3_fields),
|
self.assertEqual(len(myobj_fields) + len(myobj3_fields),
|
||||||
len(TestSubclassedObject.fields))
|
len(DummySubclassedObject.fields))
|
||||||
self.assertEqual(set(myobj_fields) | set(myobj3_fields),
|
self.assertEqual(set(myobj_fields) | set(myobj3_fields),
|
||||||
set(TestSubclassedObject.fields.keys()))
|
set(DummySubclassedObject.fields.keys()))
|
||||||
|
|
||||||
def test_get_changes(self):
|
def test_get_changes(self):
|
||||||
obj = MyObj(self.context)
|
obj = MyObj(self.context)
|
||||||
|
|||||||
Reference in New Issue
Block a user