diff --git a/devstack/lib/watcher b/devstack/lib/watcher index b2b3a14fe..011fa12aa 100644 --- a/devstack/lib/watcher +++ b/devstack/lib/watcher @@ -44,6 +44,9 @@ WATCHER_CONF_DIR=/etc/watcher WATCHER_CONF=$WATCHER_CONF_DIR/watcher.conf WATCHER_POLICY_JSON=$WATCHER_CONF_DIR/policy.json +NOVA_CONF_DIR=/etc/nova +NOVA_CONF=$NOVA_CONF_DIR/nova.conf + if is_ssl_enabled_service "watcher" || is_service_enabled tls-proxy; then WATCHER_SERVICE_PROTOCOL="https" fi @@ -123,6 +126,8 @@ function create_watcher_conf { iniset $WATCHER_CONF oslo_messaging_rabbit rabbit_password $RABBIT_PASSWORD iniset $WATCHER_CONF oslo_messaging_rabbit rabbit_host $RABBIT_HOST + iniset $NOVA_CONF oslo_messaging_notifications topics "notifications,watcher_notifications" + configure_auth_token_middleware $WATCHER_CONF watcher $WATCHER_AUTH_CACHE_DIR configure_auth_token_middleware $WATCHER_CONF watcher $WATCHER_AUTH_CACHE_DIR "watcher_clients_auth" diff --git a/watcher/applier/action_plan/default.py b/watcher/applier/action_plan/default.py index e369cb76b..54919c3d7 100644 --- a/watcher/applier/action_plan/default.py +++ b/watcher/applier/action_plan/default.py @@ -28,11 +28,11 @@ LOG = log.getLogger(__name__) class DefaultActionPlanHandler(base.BaseActionPlanHandler): - def __init__(self, context, applier_manager, action_plan_uuid): + def __init__(self, context, service, action_plan_uuid): super(DefaultActionPlanHandler, self).__init__() self.ctx = context + self.service = service self.action_plan_uuid = action_plan_uuid - self.applier_manager = applier_manager def notify(self, uuid, event_type, state): action_plan = ap_objects.ActionPlan.get_by_uuid(self.ctx, uuid) @@ -43,8 +43,7 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler): ev.data = {} payload = {'action_plan__uuid': uuid, 'action_plan_state': state} - self.applier_manager.status_topic_handler.publish_event( - ev.type.name, payload) + self.service.publish_status_event(ev.type.name, payload) def execute(self): try: @@ -52,10 +51,9 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler): self.notify(self.action_plan_uuid, event_types.EventTypes.LAUNCH_ACTION_PLAN, ap_objects.State.ONGOING) - applier = default.DefaultApplier(self.ctx, self.applier_manager) + applier = default.DefaultApplier(self.ctx, self.service) applier.execute(self.action_plan_uuid) state = ap_objects.State.SUCCEEDED - except Exception as e: LOG.exception(e) state = ap_objects.State.FAILED diff --git a/watcher/applier/manager.py b/watcher/applier/manager.py index 82b9be179..15c00f1a7 100644 --- a/watcher/applier/manager.py +++ b/watcher/applier/manager.py @@ -68,6 +68,8 @@ class ApplierManager(object): conductor_endpoints = [trigger.TriggerActionPlan] status_endpoints = [] + notification_endpoints = [] + notification_topics = [] def __init__(self): self.publisher_id = CONF.watcher_applier.publisher_id diff --git a/watcher/applier/rpcapi.py b/watcher/applier/rpcapi.py index 9eac24de1..f0be3ab35 100644 --- a/watcher/applier/rpcapi.py +++ b/watcher/applier/rpcapi.py @@ -21,7 +21,6 @@ from oslo_log import log from watcher.applier import manager from watcher.common import exception -from watcher.common.messaging import notification_handler as notification from watcher.common import service from watcher.common import utils @@ -51,7 +50,9 @@ class ApplierAPIManager(object): API_VERSION = '1.0' conductor_endpoints = [] - status_endpoints = [notification.NotificationHandler] + status_endpoints = [] + notification_endpoints = [] + notification_topics = [] def __init__(self): self.publisher_id = CONF.watcher_applier.publisher_id diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index 666bfdd16..1c614183c 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -82,8 +82,7 @@ class BaseWorkFlowEngine(loadable.Loadable): ev.data = {} payload = {'action_uuid': action.uuid, 'action_state': state} - self.applier_manager.status_topic_handler.publish_event( - ev.type.name, payload) + self.applier_manager.publish_status_event(ev.type.name, payload) @abc.abstractmethod def execute(self, actions): diff --git a/watcher/common/context.py b/watcher/common/context.py index 0c48a5ade..243f14850 100644 --- a/watcher/common/context.py +++ b/watcher/common/context.py @@ -11,16 +11,24 @@ # under the License. from oslo_context import context +from oslo_log import log as logging +from oslo_utils import timeutils +import six + +from watcher._i18n import _LW +from watcher.common import utils + +LOG = logging.getLogger(__name__) class RequestContext(context.RequestContext): """Extends security contexts from the OpenStack common library.""" - def __init__(self, auth_token=None, auth_url=None, domain_id=None, - domain_name=None, user=None, user_id=None, project=None, - project_id=None, is_admin=False, is_public_api=False, - read_only=False, show_deleted=False, request_id=None, - trust_id=None, auth_token_info=None, roles=None): + def __init__(self, user_id=None, project_id=None, is_admin=None, + roles=None, timestamp=None, request_id=None, auth_token=None, + auth_url=None, overwrite=True, user_name=None, + project_name=None, domain_name=None, domain_id=None, + auth_token_info=None, **kwargs): """Stores several additional request parameters: :param domain_id: The ID of the domain. @@ -29,46 +37,84 @@ class RequestContext(context.RequestContext): without authentication. """ - super(RequestContext, self).__init__(auth_token=auth_token, - user=user, tenant=project, - is_admin=is_admin, - read_only=read_only, - show_deleted=show_deleted, - request_id=request_id, - roles=roles) + user = kwargs.pop('user', None) + tenant = kwargs.pop('tenant', None) + super(RequestContext, self).__init__( + auth_token=auth_token, + user=user_id or user, + tenant=project_id or tenant, + domain=kwargs.pop('domain', None) or domain_name or domain_id, + user_domain=kwargs.pop('user_domain', None), + project_domain=kwargs.pop('project_domain', None), + is_admin=is_admin, + read_only=kwargs.pop('read_only', False), + show_deleted=kwargs.pop('show_deleted', False), + request_id=request_id, + resource_uuid=kwargs.pop('resource_uuid', None), + is_admin_project=kwargs.pop('is_admin_project', None), + overwrite=overwrite, + roles=roles) - self.is_public_api = is_public_api - self.user_id = user_id - self.project = project - self.project_id = project_id - self.domain_id = domain_id - self.domain_name = domain_name + self.remote_address = kwargs.pop('remote_address', None) + self.instance_lock_checked = kwargs.pop('instance_lock_checked', None) + self.read_deleted = kwargs.pop('read_deleted', None) + self.service_catalog = kwargs.pop('service_catalog', None) + self.quota_class = kwargs.pop('quota_class', None) + + # oslo_context's RequestContext.to_dict() generates this field, we can + # safely ignore this as we don't use it. + kwargs.pop('user_identity', None) + if kwargs: + LOG.warning(_LW('Arguments dropped when creating context: %s'), + str(kwargs)) + + # FIXME(dims): user_id and project_id duplicate information that is + # already present in the oslo_context's RequestContext. We need to + # get rid of them. self.auth_url = auth_url + self.domain_name = domain_name + self.domain_id = domain_id self.auth_token_info = auth_token_info - self.trust_id = trust_id + self.user_id = user_id + self.project_id = project_id + if not timestamp: + timestamp = timeutils.utcnow() + if isinstance(timestamp, six.string_types): + timestamp = timeutils.parse_isotime(timestamp) + self.timestamp = timestamp + self.user_name = user_name + self.project_name = project_name + self.is_admin = is_admin + # if self.is_admin is None: + # self.is_admin = policy.check_is_admin(self) def to_dict(self): - return {'auth_token': self.auth_token, - 'auth_url': self.auth_url, - 'domain_id': self.domain_id, - 'domain_name': self.domain_name, - 'user': self.user, - 'user_id': self.user_id, - 'project': self.project, - 'project_id': self.project_id, - 'is_admin': self.is_admin, - 'is_public_api': self.is_public_api, - 'read_only': self.read_only, - 'show_deleted': self.show_deleted, - 'request_id': self.request_id, - 'trust_id': self.trust_id, - 'auth_token_info': self.auth_token_info, - 'roles': self.roles} + values = super(RequestContext, self).to_dict() + # FIXME(dims): defensive hasattr() checks need to be + # removed once we figure out why we are seeing stack + # traces + values.update({ + 'user_id': getattr(self, 'user_id', None), + 'user_name': getattr(self, 'user_name', None), + 'project_id': getattr(self, 'project_id', None), + 'project_name': getattr(self, 'project_name', None), + 'domain_id': getattr(self, 'domain_id', None), + 'domain_name': getattr(self, 'domain_name', None), + 'auth_token_info': getattr(self, 'auth_token_info', None), + 'is_admin': getattr(self, 'is_admin', None), + 'timestamp': utils.strtime(self.timestamp) if hasattr( + self, 'timestamp') else None, + 'request_id': getattr(self, 'request_id', None), + }) + return values @classmethod def from_dict(cls, values): return cls(**values) + def __str__(self): + return "" % self.to_dict() + def make_context(*args, **kwargs): return RequestContext(*args, **kwargs) diff --git a/watcher/common/exception.py b/watcher/common/exception.py index 142941618..f14faf016 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -361,7 +361,7 @@ class InstanceNotFound(WatcherException): class ComputeNodeNotFound(WatcherException): - msg_fmt = _("The compute node %s could not be found") + msg_fmt = _("The compute node %(name)s could not be found") class LoadingError(WatcherException): diff --git a/watcher/common/messaging/messaging_handler.py b/watcher/common/messaging/messaging_handler.py index e9dfb0617..59093ca6c 100644 --- a/watcher/common/messaging/messaging_handler.py +++ b/watcher/common/messaging/messaging_handler.py @@ -110,7 +110,6 @@ class MessagingHandler(threading.Thread): def stop(self): LOG.debug('Stopped server') - self.__server.wait() self.__server.stop() def publish_event(self, event_type, payload, request_id=None): diff --git a/watcher/common/nova_helper.py b/watcher/common/nova_helper.py index 6b2e78113..f56323c2f 100644 --- a/watcher/common/nova_helper.py +++ b/watcher/common/nova_helper.py @@ -93,7 +93,6 @@ class NovaHelper(object): used as the name of the intermediate image used for migration. If this flag is False, a temporary image name is built """ - new_image_name = "" LOG.debug( @@ -298,7 +297,6 @@ class NovaHelper(object): :param dest_hostname: the name of the destination compute node. :param block_migration: No shared storage is required. """ - LOG.debug("Trying a live migrate of instance %s to host '%s'" % ( instance_id, dest_hostname)) @@ -458,7 +456,6 @@ class NovaHelper(object): :param instance_id: the unique id of the instance to delete. """ - LOG.debug("Trying to remove instance %s ..." % instance_id) instance = self.find_instance(instance_id) @@ -476,7 +473,6 @@ class NovaHelper(object): :param instance_id: the unique id of the instance to stop. """ - LOG.debug("Trying to stop instance %s ..." % instance_id) instance = self.find_instance(instance_id) @@ -504,7 +500,6 @@ class NovaHelper(object): :param retry: how many times to retry :param sleep: seconds to sleep between the retries """ - if not server: return False @@ -526,7 +521,6 @@ class NovaHelper(object): :param retry: how many times to retry :param sleep: seconds to sleep between the retries """ - if not instance: return False @@ -550,7 +544,6 @@ class NovaHelper(object): it with the new instance It returns the unique id of the created instance. """ - LOG.debug( "Trying to create new instance '%s' " "from image '%s' with flavor '%s' ..." % ( diff --git a/watcher/common/rpc.py b/watcher/common/rpc.py index 42a925eb7..eee2d6014 100644 --- a/watcher/common/rpc.py +++ b/watcher/common/rpc.py @@ -13,11 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. - from oslo_config import cfg +from oslo_log import log import oslo_messaging as messaging -from oslo_serialization import jsonutils +from watcher._i18n import _LE from watcher.common import context as watcher_context from watcher.common import exception @@ -36,7 +36,9 @@ __all__ = [ ] CONF = cfg.CONF +LOG = log.getLogger(__name__) TRANSPORT = None +NOTIFICATION_TRANSPORT = None NOTIFIER = None ALLOWED_EXMODS = [ @@ -55,23 +57,36 @@ TRANSPORT_ALIASES = { 'watcher.rpc.impl_zmq': 'zmq', } +JsonPayloadSerializer = messaging.JsonPayloadSerializer + def init(conf): - global TRANSPORT, NOTIFIER + global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER exmods = get_allowed_exmods() TRANSPORT = messaging.get_transport(conf, allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES) + NOTIFICATION_TRANSPORT = messaging.get_notification_transport( + conf, + allowed_remote_exmods=exmods, + aliases=TRANSPORT_ALIASES) + serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) + NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, + serializer=serializer) + + +def initialized(): + return None not in [TRANSPORT, NOTIFIER] def cleanup(): - global TRANSPORT, NOTIFIER - assert TRANSPORT is not None - assert NOTIFIER is not None + global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER + if NOTIFIER is None: + LOG.exception(_LE("RPC cleanup: NOTIFIER is None")) TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None + NOTIFICATION_TRANSPORT.cleanup() + TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None def set_defaults(control_exchange): @@ -90,12 +105,6 @@ def get_allowed_exmods(): return ALLOWED_EXMODS + EXTRA_EXMODS -class JsonPayloadSerializer(messaging.NoOpSerializer): - @staticmethod - def serialize_entity(context, entity): - return jsonutils.to_primitive(entity, convert_instances=True) - - class RequestContextSerializer(messaging.Serializer): def __init__(self, base): @@ -118,10 +127,6 @@ class RequestContextSerializer(messaging.Serializer): return watcher_context.RequestContext.from_dict(context) -def get_transport_url(url_str=None): - return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES) - - def get_client(target, version_cap=None, serializer=None): assert TRANSPORT is not None serializer = RequestContextSerializer(serializer) diff --git a/watcher/common/service.py b/watcher/common/service.py index 6725cdab4..c29290cc2 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -27,7 +27,7 @@ from oslo_reports import opts as gmr_opts from oslo_service import service from oslo_service import wsgi -from watcher._i18n import _ +from watcher._i18n import _, _LI from watcher.api import app from watcher.common import config from watcher.common.messaging.events import event_dispatcher as dispatcher @@ -111,8 +111,10 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.publisher_id = self.manager.publisher_id self.api_version = self.manager.API_VERSION + self.conductor_topic = self.manager.conductor_topic self.status_topic = self.manager.status_topic + self.notification_topics = self.manager.notification_topics self.conductor_endpoints = [ ep(self) for ep in self.manager.conductor_endpoints @@ -120,28 +122,52 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.status_endpoints = [ ep(self.publisher_id) for ep in self.manager.status_endpoints ] + self.notification_endpoints = self.manager.notification_endpoints self.serializer = rpc.RequestContextSerializer( base.WatcherObjectSerializer()) - self.conductor_topic_handler = self.build_topic_handler( - self.conductor_topic, self.conductor_endpoints) - self.status_topic_handler = self.build_topic_handler( - self.status_topic, self.status_endpoints) - + self._transport = None + self._notification_transport = None self._conductor_client = None self._status_client = None + self.conductor_topic_handler = None + self.status_topic_handler = None + self.notification_handler = None + + if self.conductor_topic and self.conductor_endpoints: + self.conductor_topic_handler = self.build_topic_handler( + self.conductor_topic, self.conductor_endpoints) + if self.status_topic and self.status_endpoints: + self.status_topic_handler = self.build_topic_handler( + self.status_topic, self.status_endpoints) + if self.notification_topics and self.notification_endpoints: + self.notification_handler = self.build_notification_handler( + self.notification_topics, self.notification_endpoints + ) + + @property + def transport(self): + if self._transport is None: + self._transport = om.get_transport(CONF) + return self._transport + + @property + def notification_transport(self): + if self._notification_transport is None: + self._notification_transport = om.get_notification_transport(CONF) + return self._notification_transport + @property def conductor_client(self): if self._conductor_client is None: - transport = om.get_transport(CONF) target = om.Target( topic=self.conductor_topic, version=self.API_VERSION, ) self._conductor_client = om.RPCClient( - transport, target, serializer=self.serializer) + self.transport, target, serializer=self.serializer) return self._conductor_client @conductor_client.setter @@ -151,13 +177,12 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): @property def status_client(self): if self._status_client is None: - transport = om.get_transport(CONF) target = om.Target( topic=self.status_topic, version=self.API_VERSION, ) self._status_client = om.RPCClient( - transport, target, serializer=self.serializer) + self.transport, target, serializer=self.serializer) return self._status_client @status_client.setter @@ -169,17 +194,33 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.publisher_id, topic_name, [self.manager] + list(endpoints), self.api_version, self.serializer) + def build_notification_handler(self, topic_names, endpoints=()): + serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) + targets = [om.Target(topic=topic_name) for topic_name in topic_names] + return om.get_notification_listener( + self.notification_transport, targets, endpoints, + executor='eventlet', serializer=serializer, + allow_requeue=False) + def start(self): LOG.debug("Connecting to '%s' (%s)", CONF.transport_url, CONF.rpc_backend) - self.conductor_topic_handler.start() - self.status_topic_handler.start() + if self.conductor_topic_handler: + self.conductor_topic_handler.start() + if self.status_topic_handler: + self.status_topic_handler.start() + if self.notification_handler: + self.notification_handler.start() def stop(self): LOG.debug("Disconnecting from '%s' (%s)", CONF.transport_url, CONF.rpc_backend) - self.conductor_topic_handler.stop() - self.status_topic_handler.stop() + if self.conductor_topic_handler: + self.conductor_topic_handler.stop() + if self.status_topic_handler: + self.status_topic_handler.stop() + if self.notification_handler: + self.notification_handler.stop() def reset(self): """Reset a service in case it received a SIGHUP.""" @@ -190,9 +231,14 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): def publish_control(self, event, payload): return self.conductor_topic_handler.publish_event(event, payload) - def publish_status(self, event, payload, request_id=None): - return self.status_topic_handler.publish_event( - event, payload, request_id) + def publish_status_event(self, event, payload, request_id=None): + if self.status_topic_handler: + return self.status_topic_handler.publish_event( + event, payload, request_id) + else: + LOG.info( + _LI("No status notifier declared: notification '%s' not sent"), + event) def get_version(self): return self.api_version @@ -208,7 +254,7 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): 'request_id': ctx['request_id'], 'msg': message } - self.publish_status(evt, payload) + self.publish_status_event(evt, payload) def launch(conf, service_, workers=1, restart_method='reload'): diff --git a/watcher/common/utils.py b/watcher/common/utils.py index 04b487155..6751d0dd1 100644 --- a/watcher/common/utils.py +++ b/watcher/common/utils.py @@ -132,6 +132,10 @@ def get_cls_import_path(cls): return module + '.' + cls.__name__ +def strtime(at): + return at.strftime("%Y-%m-%dT%H:%M:%S.%f") + + # Default value feedback extension as jsonschema doesn't support it def extend_with_default(validator_class): validate_properties = validator_class.VALIDATORS["properties"] diff --git a/watcher/decision_engine/audit/base.py b/watcher/decision_engine/audit/base.py index e44d6fded..b857c7678 100644 --- a/watcher/decision_engine/audit/base.py +++ b/watcher/decision_engine/audit/base.py @@ -78,8 +78,7 @@ class AuditHandler(BaseAuditHandler): event.data = {} payload = {'audit_uuid': audit_uuid, 'audit_status': status} - self.messaging.status_topic_handler.publish_event( - event.type.name, payload) + self.messaging.publish_status_event(event.type.name, payload) def update_audit_state(self, request_context, audit, state): LOG.debug("Update audit state: %s", state) diff --git a/watcher/decision_engine/manager.py b/watcher/decision_engine/manager.py index e09af3b57..774bd2b34 100644 --- a/watcher/decision_engine/manager.py +++ b/watcher/decision_engine/manager.py @@ -15,7 +15,6 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -# """ This component is responsible for computing a set of potential optimization @@ -40,6 +39,7 @@ See :doc:`../architecture` for more details on this component. from oslo_config import cfg from watcher.decision_engine.messaging import audit_endpoint +from watcher.decision_engine.model.collector import manager CONF = cfg.CONF @@ -57,6 +57,10 @@ WATCHER_DECISION_ENGINE_OPTS = [ 'is used so as to notify' 'the others components ' 'of the system'), + cfg.ListOpt('notification_topics', + default=['versioned_notifications', 'watcher_notifications'], + help='The topic names from which notification events ' + 'will be listened to'), cfg.StrOpt('publisher_id', default='watcher.decision.api', help='The identifier used by the Watcher ' @@ -65,8 +69,7 @@ WATCHER_DECISION_ENGINE_OPTS = [ default=2, required=True, help='The maximum number of threads that can be used to ' - 'execute strategies', - ), + 'execute strategies'), ] decision_engine_opt_group = cfg.OptGroup(name='watcher_decision_engine', title='Defines the parameters of ' @@ -79,11 +82,19 @@ class DecisionEngineManager(object): API_VERSION = '1.0' - conductor_endpoints = [audit_endpoint.AuditEndpoint] - status_endpoints = [] - def __init__(self): + self.api_version = self.API_VERSION + self.publisher_id = CONF.watcher_decision_engine.publisher_id self.conductor_topic = CONF.watcher_decision_engine.conductor_topic self.status_topic = CONF.watcher_decision_engine.status_topic - self.api_version = self.API_VERSION + self.notification_topics = ( + CONF.watcher_decision_engine.notification_topics) + + self.conductor_endpoints = [audit_endpoint.AuditEndpoint] + + self.status_endpoints = [] + + self.collector_manager = manager.CollectorManager() + self.notification_endpoints = ( + self.collector_manager.get_notification_endpoints()) diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index a5c5f836b..d6cab9354 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -30,6 +30,7 @@ LOG = log.getLogger(__name__) class AuditEndpoint(object): + def __init__(self, messaging): self._messaging = messaging self._executor = futures.ThreadPoolExecutor( diff --git a/watcher/decision_engine/model/collector/base.py b/watcher/decision_engine/model/collector/base.py index 7575a5f8f..18f62e5ff 100644 --- a/watcher/decision_engine/model/collector/base.py +++ b/watcher/decision_engine/model/collector/base.py @@ -139,6 +139,15 @@ class BaseClusterDataModelCollector(loadable.LoadableSingleton): self._cluster_data_model = model self.lock.release() + @abc.abstractproperty + def notification_endpoints(self): + """Associated notification endpoints + + :return: Associated notification endpoints + :rtype: List of :py:class:`~.EventsNotificationEndpoint` instances + """ + raise NotImplementedError() + def set_cluster_data_model_as_stale(self): self.cluster_data_model = self.STALE_MODEL diff --git a/watcher/decision_engine/model/collector/manager.py b/watcher/decision_engine/model/collector/manager.py index 05ba8f3a7..cfa75ae09 100644 --- a/watcher/decision_engine/model/collector/manager.py +++ b/watcher/decision_engine/model/collector/manager.py @@ -31,6 +31,7 @@ class CollectorManager(object): def __init__(self): self.collector_loader = default.ClusterDataModelCollectorLoader() self._collectors = None + self._notification_endpoints = None def get_collectors(self): if self._collectors is None: @@ -43,6 +44,15 @@ class CollectorManager(object): return self._collectors + def get_notification_endpoints(self): + if self._notification_endpoints is None: + endpoints = [] + for collector in self.get_collectors().values(): + endpoints.extend(collector.notification_endpoints) + self._notification_endpoints = endpoints + + return self._notification_endpoints + def get_cluster_model_collector(self, name, osc=None): """Retrieve cluster data model collector diff --git a/watcher/decision_engine/model/collector/nova.py b/watcher/decision_engine/model/collector/nova.py index 699fb9ab4..b9d192776 100644 --- a/watcher/decision_engine/model/collector/nova.py +++ b/watcher/decision_engine/model/collector/nova.py @@ -22,6 +22,7 @@ from watcher.common import nova_helper from watcher.decision_engine.model.collector import base from watcher.decision_engine.model import element from watcher.decision_engine.model import model_root +from watcher.decision_engine.model.notification import nova LOG = log.getLogger(__name__) @@ -43,6 +44,26 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector): super(NovaClusterDataModelCollector, self).__init__(config, osc) self.wrapper = nova_helper.NovaHelper(osc=self.osc) + @property + def notification_endpoints(self): + """Associated notification endpoints + + :return: Associated notification endpoints + :rtype: List of :py:class:`~.EventsNotificationEndpoint` instances + """ + return [ + nova.ServiceUpdated(self), + + nova.InstanceCreated(self), + nova.InstanceUpdated(self), + nova.InstanceDeletedEnd(self), + + nova.LegacyInstanceCreatedEnd(self), + nova.LegacyInstanceUpdated(self), + nova.LegacyInstanceDeletedEnd(self), + nova.LegacyLiveMigratedEnd(self), + ] + def execute(self): """Build the compute cluster data model""" LOG.debug("Building latest Nova cluster data model") @@ -87,7 +108,6 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector): disk.set_capacity(instance, v.flavor['disk']) num_cores.set_capacity(instance, v.flavor['vcpus']) - model.get_mapping().map(node, instance) - model.add_instance(instance) + model.map_instance(instance, node) return model diff --git a/watcher/decision_engine/model/element/__init__.py b/watcher/decision_engine/model/element/__init__.py index 30d242e3d..51ec0302a 100644 --- a/watcher/decision_engine/model/element/__init__.py +++ b/watcher/decision_engine/model/element/__init__.py @@ -22,7 +22,6 @@ from watcher.decision_engine.model.element import node from watcher.decision_engine.model.element import resource ServiceState = node.ServiceState -PowerState = node.PowerState ComputeNode = node.ComputeNode InstanceState = instance.InstanceState @@ -35,5 +34,5 @@ Resource = resource.Resource __all__ = [ - 'ServiceState', 'PowerState', 'ComputeNode', 'InstanceState', 'Instance', + 'ServiceState', 'ComputeNode', 'InstanceState', 'Instance', 'DiskInfo', 'ResourceType', 'Resource'] diff --git a/watcher/decision_engine/model/element/node.py b/watcher/decision_engine/model/element/node.py index 08dee647f..edc76fa36 100644 --- a/watcher/decision_engine/model/element/node.py +++ b/watcher/decision_engine/model/element/node.py @@ -26,31 +26,12 @@ class ServiceState(enum.Enum): DISABLED = 'disabled' -class PowerState(enum.Enum): - # away mode - g0 = "g0" - # power on suspend (processor caches are flushed) - # The power to the CPU(s) and RAM is maintained - g1_S1 = "g1_S1" - # CPU powered off. Dirty cache is flushed to RAM - g1_S2 = "g1_S2" - # Suspend to RAM - g1_S3 = "g1_S3" - # Suspend to Disk - g1_S4 = "g1_S4" - # switch outlet X OFF on the PDU (Power Distribution Unit) - switch_off = "switch_off" - # switch outlet X ON on the PDU (Power Distribution Unit) - switch_on = "switch_on" - - class ComputeNode(compute_resource.ComputeResource): def __init__(self): super(ComputeNode, self).__init__() - self._state = ServiceState.ONLINE - self._status = ServiceState.ENABLED - self._power_state = PowerState.g0 + self._state = ServiceState.ONLINE.value + self._status = ServiceState.ENABLED.value def accept(self, visitor): raise NotImplementedError() @@ -70,11 +51,3 @@ class ComputeNode(compute_resource.ComputeResource): @status.setter def status(self, s): self._status = s - - @property - def powerstate(self): - return self._power_state - - @powerstate.setter - def powerstate(self, p): - self._power_state = p diff --git a/watcher/decision_engine/model/element/resource.py b/watcher/decision_engine/model/element/resource.py index 2854132bc..9becea97a 100644 --- a/watcher/decision_engine/model/element/resource.py +++ b/watcher/decision_engine/model/element/resource.py @@ -47,6 +47,9 @@ class Resource(object): def set_capacity(self, element, value): self.mapping[element.uuid] = value + def unset_capacity(self, element): + del self.mapping[element.uuid] + def get_capacity_from_id(self, uuid): if str(uuid) in self.mapping.keys(): return self.mapping[str(uuid)] diff --git a/watcher/decision_engine/model/mapping.py b/watcher/decision_engine/model/mapping.py index 72470dd40..d86b97c6c 100644 --- a/watcher/decision_engine/model/mapping.py +++ b/watcher/decision_engine/model/mapping.py @@ -36,17 +36,15 @@ class Mapping(object): :param node: the node :param instance: the virtual machine or instance """ - try: self.lock.acquire() # init first if node.uuid not in self.compute_node_mapping.keys(): - self.compute_node_mapping[node.uuid] = [] + self.compute_node_mapping[node.uuid] = set() # map node => instances - self.compute_node_mapping[node.uuid].append( - instance.uuid) + self.compute_node_mapping[node.uuid].add(instance.uuid) # map instance => node self.instance_mapping[instance.uuid] = node.uuid @@ -60,7 +58,6 @@ class Mapping(object): :param node: the node :param instance: the virtual machine or instance """ - self.unmap_from_id(node.uuid, instance.uuid) def unmap_from_id(self, node_uuid, instance_uuid): @@ -68,7 +65,6 @@ class Mapping(object): :rtype : object """ - try: self.lock.acquire() if str(node_uuid) in self.compute_node_mapping: @@ -77,9 +73,9 @@ class Mapping(object): # remove instance self.instance_mapping.pop(instance_uuid) else: - LOG.warning(_LW( - "Trying to delete the instance %(instance)s but it was " - "not found on node %(node)s"), + LOG.warning( + _LW("Trying to delete the instance %(instance)s but it " + "was not found on node %(node)s") % {'instance': instance_uuid, 'node': node_uuid}) finally: self.lock.release() @@ -96,7 +92,6 @@ class Mapping(object): :param instance: the uuid of the instance :return: node """ - return self.model.get_node_from_id( self.instance_mapping[str(instance_uuid)]) @@ -113,21 +108,4 @@ class Mapping(object): return self.compute_node_mapping[str(node_uuid)] else: # empty - return [] - - def migrate_instance(self, instance, source_node, destination_node): - """Migrate single instance from source_node to destination_node - - :param instance: - :param source_node: - :param destination_node: - :return: - """ - - if source_node == destination_node: - return False - # unmap - self.unmap(source_node, instance) - # map - self.map(destination_node, instance) - return True + return set() diff --git a/watcher/decision_engine/model/model_root.py b/watcher/decision_engine/model/model_root.py index 5872d392e..4f4886aac 100644 --- a/watcher/decision_engine/model/model_root.py +++ b/watcher/decision_engine/model/model_root.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import six + from watcher._i18n import _ from watcher.common import exception from watcher.common import utils @@ -51,7 +53,7 @@ class ModelRoot(object): def remove_node(self, node): self.assert_node(node) if str(node.uuid) not in self._nodes: - raise exception.ComputeNodeNotFound(node.uuid) + raise exception.ComputeNodeNotFound(name=node.uuid) else: del self._nodes[node.uuid] @@ -59,12 +61,75 @@ class ModelRoot(object): self.assert_instance(instance) self._instances[instance.uuid] = instance + def remove_instance(self, instance): + self.assert_instance(instance) + del self._instances[instance.uuid] + + def map_instance(self, instance, node): + """Map a newly created instance to a node + + :param instance: :py:class:`~.Instance` object or instance UUID + :type instance: str or :py:class:`~.Instance` + :param node: :py:class:`~.ComputeNode` object or node UUID + :type node: str or :py:class:`~.Instance` + """ + if isinstance(instance, six.string_types): + instance = self.get_instance_from_id(instance) + if isinstance(node, six.string_types): + node = self.get_node_from_id(node) + + self.add_instance(instance) + self.mapping.map(node, instance) + + def unmap_instance(self, instance, node): + """Unmap an instance from a node + + :param instance: :py:class:`~.Instance` object or instance UUID + :type instance: str or :py:class:`~.Instance` + :param node: :py:class:`~.ComputeNode` object or node UUID + :type node: str or :py:class:`~.Instance` + """ + if isinstance(instance, six.string_types): + instance = self.get_instance_from_id(instance) + if isinstance(node, six.string_types): + node = self.get_node_from_id(node) + + self.add_instance(instance) + self.mapping.unmap(node, instance) + + def delete_instance(self, instance, node): + self.remove_instance(instance) + + self.mapping.unmap(node, instance) + + for resource in self.resource.values(): + try: + resource.unset_capacity(instance) + except KeyError: + pass + + def migrate_instance(self, instance, source_node, destination_node): + """Migrate single instance from source_node to destination_node + + :param instance: + :param source_node: + :param destination_node: + :return: + """ + if source_node == destination_node: + return False + # unmap + self.mapping.unmap(source_node, instance) + # map + self.mapping.map(destination_node, instance) + return True + def get_all_compute_nodes(self): return self._nodes def get_node_from_id(self, node_uuid): if str(node_uuid) not in self._nodes: - raise exception.ComputeNodeNotFound(node_uuid) + raise exception.ComputeNodeNotFound(name=node_uuid) return self._nodes[str(node_uuid)] def get_instance_from_id(self, uuid): @@ -72,6 +137,17 @@ class ModelRoot(object): raise exception.InstanceNotFound(name=uuid) return self._instances[str(uuid)] + def get_node_from_instance_id(self, instance_uuid): + """Getting host information from the guest instance + + :param instance_uuid: the uuid of the instance + :return: node + """ + if str(instance_uuid) not in self.mapping.instance_mapping: + raise exception.InstanceNotFound(name=instance_uuid) + return self.get_node_from_id( + self.mapping.instance_mapping[str(instance_uuid)]) + def get_all_instances(self): return self._instances diff --git a/watcher/decision_engine/model/notification/__init__.py b/watcher/decision_engine/model/notification/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/decision_engine/model/notification/base.py b/watcher/decision_engine/model/notification/base.py new file mode 100644 index 000000000..6dd30c79a --- /dev/null +++ b/watcher/decision_engine/model/notification/base.py @@ -0,0 +1,51 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + +from oslo_log import log + +from watcher.common import rpc + +LOG = log.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class NotificationEndpoint(object): + + def __init__(self, collector): + super(NotificationEndpoint, self).__init__() + self.collector = collector + self._notifier = None + + @abc.abstractproperty + def filter_rule(self): + """Notification Filter""" + raise NotImplementedError() + + @property + def cluster_data_model(self): + return self.collector.cluster_data_model + + @property + def notifier(self): + if self._notifier is None: + self._notifier = rpc.get_notifier('decision-engine') + + return self._notifier diff --git a/watcher/decision_engine/model/notification/filtering.py b/watcher/decision_engine/model/notification/filtering.py new file mode 100644 index 000000000..e0d7349d4 --- /dev/null +++ b/watcher/decision_engine/model/notification/filtering.py @@ -0,0 +1,91 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +from oslo_log import log +import oslo_messaging as om +import six + +LOG = log.getLogger(__name__) + + +class NotificationFilter(om.NotificationFilter): + """Notification Endpoint base class + + This class is responsible for handling incoming notifications. Depending + on the priority level of the incoming, you may need to implement one or + more of the following methods: + + .. code: py + def audit(self, ctxt, publisher_id, event_type, payload, metadata): + do_something(payload) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + do_something(payload) + + def warn(self, ctxt, publisher_id, event_type, payload, metadata): + do_something(payload) + + def error(self, ctxt, publisher_id, event_type, payload, metadata): + do_something(payload) + + def critical(self, ctxt, publisher_id, event_type, payload, metadata): + do_something(payload) + """ + + def _build_regex_dict(self, regex_list): + if regex_list is None: + return {} + + regex_mapping = {} + for key, value in regex_list.items(): + if isinstance(value, dict): + regex_mapping[key] = self._build_regex_dict(value) + else: + if callable(value): + regex_mapping[key] = value + elif value is not None: + regex_mapping[key] = re.compile(value) + else: + regex_mapping[key] = None + + return regex_mapping + + def _check_for_mismatch(self, data, regex): + if isinstance(regex, dict): + mismatch_results = [ + k not in data or not self._check_for_mismatch(data[k], v) + for k, v in regex.items() + ] + if not mismatch_results: + return False + + return all(mismatch_results) + elif callable(regex): + # The filter is a callable that should return True + # if there is a mismatch + return regex(data) + elif regex is not None and data is None: + return True + elif (regex is not None and + isinstance(data, six.string_types) and + not regex.match(data)): + return True + + return False diff --git a/watcher/decision_engine/model/notification/nova.py b/watcher/decision_engine/model/notification/nova.py new file mode 100644 index 000000000..4e2ba44e1 --- /dev/null +++ b/watcher/decision_engine/model/notification/nova.py @@ -0,0 +1,334 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log + +from watcher._i18n import _LI +from watcher.common import exception +from watcher.decision_engine.model import element +from watcher.decision_engine.model.notification import base +from watcher.decision_engine.model.notification import filtering + +LOG = log.getLogger(__name__) + + +class NovaNotification(base.NotificationEndpoint): + + def get_or_create_instance(self, uuid): + try: + instance = self.cluster_data_model.get_instance_from_id(uuid) + except exception.InstanceNotFound: + # The instance didn't exist yet so we create a new instance object + LOG.debug("New instance created: %s", uuid) + instance = element.Instance() + instance.uuid = uuid + + self.cluster_data_model.add_instance(instance) + + return instance + + def update_instance(self, instance, data): + instance_data = data['nova_object.data'] + instance_flavor_data = instance_data['flavor']['nova_object.data'] + + instance.state = instance_data['state'] + instance.hostname = instance_data['host_name'] + instance.human_id = instance_data['display_name'] + + memory_mb = instance_flavor_data['memory_mb'] + num_cores = instance_flavor_data['vcpus'] + disk_gb = instance_flavor_data['root_gb'] + + self.update_capacity(element.ResourceType.memory, instance, memory_mb) + self.update_capacity( + element.ResourceType.cpu_cores, instance, num_cores) + self.update_capacity( + element.ResourceType.disk, instance, disk_gb) + + node = self.get_or_create_node(instance_data['host']) + + self.update_instance_mapping(instance, node) + + def update_capacity(self, resource_id, obj, value): + resource = self.cluster_data_model.get_resource_from_id(resource_id) + resource.set_capacity(obj, value) + + def legacy_update_instance(self, instance, data): + instance.state = data['state'] + instance.hostname = data['hostname'] + instance.human_id = data['display_name'] + + memory_mb = data['memory_mb'] + num_cores = data['vcpus'] + disk_gb = data['root_gb'] + + self.update_capacity(element.ResourceType.memory, instance, memory_mb) + self.update_capacity( + element.ResourceType.cpu_cores, instance, num_cores) + self.update_capacity( + element.ResourceType.disk, instance, disk_gb) + + node = self.get_or_create_node(data['host']) + + self.update_instance_mapping(instance, node) + + def get_or_create_node(self, uuid): + if uuid is None: + LOG.debug("Compute node UUID not provided: skipping") + return + try: + node = self.cluster_data_model.get_node_from_id(uuid) + except exception.ComputeNodeNotFound: + # The node didn't exist yet so we create a new node object + LOG.debug("New compute node created: %s", uuid) + node = element.ComputeNode() + node.uuid = uuid + + self.cluster_data_model.add_node(node) + + return node + + def update_instance_mapping(self, instance, node): + if not node: + LOG.debug("Instance %s not yet attached to any node: skipping", + instance.uuid) + return + try: + old_node = self.get_or_create_node(node.uuid) + LOG.debug("Mapped node %s found", node.uuid) + if node and node != old_node: + LOG.debug("Unmapping instance %s from %s", + instance.uuid, node.uuid) + self.cluster_data_model.unmap_instance(instance, old_node) + except exception.InstanceNotFound: + # The instance didn't exist yet so we map it for the first time + LOG.debug("New instance: mapping it to %s", node.uuid) + finally: + if node: + self.cluster_data_model.map_instance(instance, node) + LOG.debug("Mapped instance %s to %s", instance.uuid, node.uuid) + + def delete_instance(self, instance, node): + try: + self.cluster_data_model.delete_instance(instance, node) + except Exception as exc: + LOG.exception(exc) + LOG.info(_LI("Instance %s already deleted"), instance.uuid) + + +class VersionnedNotificationEndpoint(NovaNotification): + publisher_id_regex = r'^nova-compute.*' + + +class UnversionnedNotificationEndpoint(NovaNotification): + publisher_id_regex = r'^compute.*' + + +class ServiceUpdated(VersionnedNotificationEndpoint): + + @property + def filter_rule(self): + """Nova service.update notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='service.update', + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + node_data = payload['nova_object.data'] + node_uuid = node_data['host'] + node = self.get_or_create_node(node_uuid) + + node.hostname = node_data['host'] + node.state = ( + element.ServiceState.OFFLINE.value + if node_data['forced_down'] else element.ServiceState.ONLINE.value) + node.status = ( + element.ServiceState.DISABLED.value + if node_data['host'] else element.ServiceState.ENABLED.value) + + +class InstanceCreated(VersionnedNotificationEndpoint): + + @property + def filter_rule(self): + """Nova instance.update notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='instance.update', + # To be "fully" created, an instance transitions + # from the 'building' state to the 'active' one. + # See http://docs.openstack.org/developer/nova/vmstates.html + payload={ + 'nova_object.data': { + 'state': element.InstanceState.ACTIVE.value, + 'state_update': { + 'nova_object.data': { + 'old_state': element.InstanceState.BUILDING.value, + 'state': element.InstanceState.ACTIVE.value, + }, + 'nova_object.name': 'InstanceStateUpdatePayload', + 'nova_object.namespace': 'nova', + }, + } + } + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + instance_data = payload['nova_object.data'] + + instance_uuid = instance_data['uuid'] + instance = self.get_or_create_instance(instance_uuid) + + self.update_instance(instance, payload) + + +class InstanceUpdated(VersionnedNotificationEndpoint): + + @staticmethod + def _match_not_new_instance_state(data): + is_new_instance = ( + data['old_state'] == element.InstanceState.BUILDING.value and + data['state'] == element.InstanceState.ACTIVE.value) + + return not is_new_instance + + @property + def filter_rule(self): + """Nova instance.update notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='instance.update', + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + instance_data = payload['nova_object.data'] + instance_uuid = instance_data['uuid'] + instance = self.get_or_create_instance(instance_uuid) + + self.update_instance(instance, payload) + + +class InstanceDeletedEnd(VersionnedNotificationEndpoint): + + @property + def filter_rule(self): + """Nova service.update notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='instance.delete.end', + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + + instance_data = payload['nova_object.data'] + instance_uuid = instance_data['uuid'] + instance = self.get_or_create_instance(instance_uuid) + + node = self.get_or_create_node(instance_data['host']) + + self.delete_instance(instance, node) + + +class LegacyInstanceUpdated(UnversionnedNotificationEndpoint): + + @property + def filter_rule(self): + """Nova compute.instance.update notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='compute.instance.update', + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + + instance_uuid = payload['instance_id'] + instance = self.get_or_create_instance(instance_uuid) + + self.legacy_update_instance(instance, payload) + + +class LegacyInstanceCreatedEnd(UnversionnedNotificationEndpoint): + + @property + def filter_rule(self): + """Nova compute.instance.create.end notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='compute.instance.create.end', + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + + instance_uuid = payload['instance_id'] + instance = self.get_or_create_instance(instance_uuid) + + self.legacy_update_instance(instance, payload) + + +class LegacyInstanceDeletedEnd(UnversionnedNotificationEndpoint): + + @property + def filter_rule(self): + """Nova compute.instance.delete.end notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='compute.instance.delete.end', + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + instance_uuid = payload['instance_id'] + instance = self.get_or_create_instance(instance_uuid) + + node = self.get_or_create_node(payload['host']) + + self.delete_instance(instance, node) + + +class LegacyLiveMigratedEnd(UnversionnedNotificationEndpoint): + + @property + def filter_rule(self): + """Nova *.live_migration.post.dest.end notification filter""" + return filtering.NotificationFilter( + publisher_id=self.publisher_id_regex, + event_type='compute.instance.live_migration.post.dest.end', + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info(_LI("Event '%(event)s' received from %(publisher)s") % + dict(event=event_type, publisher=publisher_id)) + + instance_uuid = payload['instance_id'] + instance = self.get_or_create_instance(instance_uuid) + + self.legacy_update_instance(instance, payload) diff --git a/watcher/decision_engine/rpcapi.py b/watcher/decision_engine/rpcapi.py index 6c56c2083..6da9a03f5 100644 --- a/watcher/decision_engine/rpcapi.py +++ b/watcher/decision_engine/rpcapi.py @@ -52,6 +52,8 @@ class DecisionEngineAPIManager(object): conductor_endpoints = [] status_endpoints = [notification_handler.NotificationHandler] + notification_endpoints = [] + notification_topics = [] def __init__(self): self.publisher_id = CONF.watcher_decision_engine.publisher_id diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index 7da849cd3..65f63b683 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -209,24 +209,24 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): """ return self.migration_attempts - def calculate_weight(self, node, total_cores_used, total_disk_used, - total_memory_used): + def calculate_weight(self, compute_resource, total_cores_used, + total_disk_used, total_memory_used): """Calculate weight of every resource - :param element: + :param compute_resource: :param total_cores_used: :param total_disk_used: :param total_memory_used: :return: """ cpu_capacity = self.compute_model.get_resource_from_id( - element.ResourceType.cpu_cores).get_capacity(node) + element.ResourceType.cpu_cores).get_capacity(compute_resource) disk_capacity = self.compute_model.get_resource_from_id( - element.ResourceType.disk).get_capacity(node) + element.ResourceType.disk).get_capacity(compute_resource) memory_capacity = self.compute_model.get_resource_from_id( - element.ResourceType.memory).get_capacity(node) + element.ResourceType.memory).get_capacity(compute_resource) score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) / float(cpu_capacity)) @@ -261,10 +261,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): if host_avg_cpu_util is None: LOG.error( _LE("No values returned by %(resource_id)s " - "for %(metric_name)s"), - resource_id=resource_id, - metric_name=self.HOST_CPU_USAGE_METRIC_NAME, - ) + "for %(metric_name)s") % dict( + resource_id=resource_id, + metric_name=self.HOST_CPU_USAGE_METRIC_NAME)) host_avg_cpu_util = 100 cpu_capacity = self.compute_model.get_resource_from_id( @@ -302,10 +301,9 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): if instance_cpu_utilization is None: LOG.error( _LE("No values returned by %(resource_id)s " - "for %(metric_name)s"), - resource_id=instance.uuid, - metric_name=self.INSTANCE_CPU_USAGE_METRIC_NAME, - ) + "for %(metric_name)s") % dict( + resource_id=instance.uuid, + metric_name=self.INSTANCE_CPU_USAGE_METRIC_NAME)) instance_cpu_utilization = 100 cpu_capacity = self.compute_model.get_resource_from_id( @@ -335,18 +333,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def score_of_nodes(self, score): """Calculate score of nodes based on load by VMs""" - for node_id in self.compute_model.get_all_compute_nodes(): - node = self.compute_model. \ - get_node_from_id(node_id) - count = self.compute_model.get_mapping(). \ - get_node_instances_from_id(node_id) + for node in self.compute_model.get_all_compute_nodes().values(): + count = self.compute_model.mapping.get_node_instances_from_id( + node.uuid) if len(count) > 0: result = self.calculate_score_node(node) else: # The node has not VMs result = 0 if len(count) > 0: - score.append((node_id, result)) + score.append((node.uuid, result)) return score def node_and_instance_score(self, sorted_score, score): @@ -368,7 +364,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def create_migration_instance(self, mig_instance, mig_source_node, mig_destination_node): """Create migration VM""" - if self.compute_model.get_mapping().migrate_instance( + if self.compute_model.migrate_instance( mig_instance, mig_source_node, mig_destination_node): self.add_migration(mig_instance.uuid, 'live', mig_source_node.uuid, @@ -427,14 +423,14 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): self.compute_attempts(size_cluster) - for node_id in self.compute_model.get_all_compute_nodes(): - node = self.compute_model.get_node_from_id(node_id) - count = self.compute_model.get_mapping(). \ - get_node_instances_from_id(node_id) - if len(count) == 0: + for node_uuid, node in self.compute_model.get_all_compute_nodes( + ).items(): + node_instances = (self.compute_model.mapping + .get_node_instances_from_id(node_uuid)) + if node_instances: if node.state == element.ServiceState.ENABLED: self.add_change_service_state( - node_id, element.ServiceState.DISABLED.value) + node_uuid, element.ServiceState.DISABLED.value) while self.get_allowed_migration_attempts() >= unsuccessful_migration: if not first_migration: diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index 50baa4483..e11a732e9 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -272,7 +272,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): # always use the host with lowerest outlet temperature mig_destination_node = dest_servers[0]['node'] # generate solution to migrate the instance to the dest server, - if self.compute_model.mapping.migrate_instance( + if self.compute_model.migrate_instance( instance_src, mig_source_node, mig_destination_node): parameters = {'migration_type': 'live', 'source_node': mig_source_node.uuid, diff --git a/watcher/decision_engine/strategy/strategies/uniform_airflow.py b/watcher/decision_engine/strategy/strategies/uniform_airflow.py index 75a7cb1f4..020f246a8 100644 --- a/watcher/decision_engine/strategy/strategies/uniform_airflow.py +++ b/watcher/decision_engine/strategy/strategies/uniform_airflow.py @@ -333,7 +333,7 @@ class UniformAirflow(base.BaseStrategy): for info in destination_hosts: instance = info['instance'] destination_node = info['node'] - if self.compute_model.mapping.migrate_instance( + if self.compute_model.migrate_instance( instance, source_node, destination_node): parameters = {'migration_type': 'live', 'source_node': source_node.uuid, diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index 5bf22ba9b..cd2475cb9 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -329,7 +329,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): # always use the host with lowerest CPU utilization mig_destination_node = destination_hosts[0]['node'] # generate solution to migrate the instance to the dest server, - if self.compute_model.mapping.migrate_instance( + if self.compute_model.migrate_instance( instance_src, source_node, mig_destination_node): parameters = {'migration_type': 'live', 'source_node': source_node.uuid, diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index eb60ff053..bf55fbafa 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -350,7 +350,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): def create_migration_instance(self, mig_instance, mig_source_node, mig_destination_node): """Create migration VM """ - if self.compute_model.get_mapping().migrate_instance( + if self.compute_model.migrate_instance( mig_instance, mig_source_node, mig_destination_node): self.add_migration(mig_instance.uuid, 'live', mig_source_node.uuid, diff --git a/watcher/tests/applier/action_plan/test_default_action_handler.py b/watcher/tests/applier/action_plan/test_default_action_handler.py index 69c52a08f..abe40407c 100644 --- a/watcher/tests/applier/action_plan/test_default_action_handler.py +++ b/watcher/tests/applier/action_plan/test_default_action_handler.py @@ -32,18 +32,17 @@ class TestDefaultActionPlanHandler(base.DbTestCase): self.context) def test_launch_action_plan(self): - command = default.DefaultActionPlanHandler(self.context, - mock.MagicMock(), - self.action_plan.uuid) + command = default.DefaultActionPlanHandler( + self.context, mock.MagicMock(), self.action_plan.uuid) command.execute() - action_plan = ap_objects.ActionPlan.get_by_uuid(self.context, - self.action_plan.uuid) + action_plan = ap_objects.ActionPlan.get_by_uuid( + self.context, self.action_plan.uuid) self.assertEqual(ap_objects.State.SUCCEEDED, action_plan.state) def test_trigger_audit_send_notification(self): messaging = mock.MagicMock() - command = default.DefaultActionPlanHandler(self.context, messaging, - self.action_plan.uuid) + command = default.DefaultActionPlanHandler( + self.context, messaging, self.action_plan.uuid) command.execute() call_on_going = mock.call(ev.EventTypes.LAUNCH_ACTION_PLAN.name, { @@ -54,6 +53,5 @@ class TestDefaultActionPlanHandler(base.DbTestCase): 'action_plan__uuid': self.action_plan.uuid}) calls = [call_on_going, call_succeeded] - messaging.status_topic_handler.publish_event.assert_has_calls(calls) - self.assertEqual( - 2, messaging.status_topic_handler.publish_event.call_count) + messaging.publish_status_event.assert_has_calls(calls) + self.assertEqual(2, messaging.publish_status_event.call_count) diff --git a/watcher/tests/applier/test_applier_manager.py b/watcher/tests/applier/test_applier_manager.py index 1c6f8c566..19532ad4e 100644 --- a/watcher/tests/applier/test_applier_manager.py +++ b/watcher/tests/applier/test_applier_manager.py @@ -17,7 +17,7 @@ # limitations under the License. # -from mock import patch +import mock from watcher.applier import manager as applier_manager from watcher.common.messaging import messaging_handler @@ -30,10 +30,10 @@ class TestApplierManager(base.TestCase): super(TestApplierManager, self).setUp() self.applier = service.Service(applier_manager.ApplierManager) - @patch.object(messaging_handler.MessagingHandler, "stop") - @patch.object(messaging_handler.MessagingHandler, "start") + @mock.patch.object(messaging_handler.MessagingHandler, "stop") + @mock.patch.object(messaging_handler.MessagingHandler, "start") def test_start(self, m_messaging_start, m_messaging_stop): self.applier.start() self.applier.stop() - self.assertEqual(2, m_messaging_start.call_count) - self.assertEqual(2, m_messaging_stop.call_count) + self.assertEqual(1, m_messaging_start.call_count) + self.assertEqual(1, m_messaging_stop.call_count) diff --git a/watcher/tests/base.py b/watcher/tests/base.py index 80fcadc62..a2f45aa7c 100644 --- a/watcher/tests/base.py +++ b/watcher/tests/base.py @@ -21,6 +21,7 @@ import os import mock from oslo_config import cfg from oslo_log import log +from oslo_messaging import conffixture from oslotest import base import pecan from pecan import testing @@ -52,10 +53,19 @@ class TestCase(BaseTestCase): def setUp(self): super(TestCase, self).setUp() self.useFixture(conf_fixture.ConfReloadFixture()) - self.app = testing.load_test_app(os.path.join( - os.path.dirname(__file__), - 'config.py' - )) + self.policy = self.useFixture(policy_fixture.PolicyFixture()) + self.messaging_conf = self.useFixture(conffixture.ConfFixture(CONF)) + self.messaging_conf.transport_driver = 'fake' + + cfg.CONF.set_override("auth_type", "admin_token", + group='keystone_authtoken', + enforce_type=True) + cfg.CONF.set_override("auth_uri", "http://127.0.0.1/identity", + group='keystone_authtoken', + enforce_type=True) + + app_config_path = os.path.join(os.path.dirname(__file__), 'config.py') + self.app = testing.load_test_app(app_config_path) token_info = { 'token': { 'project': { @@ -71,8 +81,6 @@ class TestCase(BaseTestCase): project_id='fake_project', user_id='fake_user') - self.policy = self.useFixture(policy_fixture.PolicyFixture()) - def make_context(*args, **kwargs): # If context hasn't been constructed with token_info if not kwargs.get('auth_token_info'): @@ -120,11 +128,8 @@ class TestCase(BaseTestCase): :param project_file: File whose path to return. Default: None. :returns: path to the specified file, or path to project root. """ - root = os.path.abspath(os.path.join(os.path.dirname(__file__), - '..', - '..', - ) - ) + root = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..')) if project_file: return os.path.join(root, project_file) else: diff --git a/watcher/tests/common/test_service.py b/watcher/tests/common/test_service.py index 5fe782a6d..9e6dbf81e 100644 --- a/watcher/tests/common/test_service.py +++ b/watcher/tests/common/test_service.py @@ -27,13 +27,15 @@ class DummyManager(object): API_VERSION = '1.0' - conductor_endpoints = [] - status_endpoints = [] + conductor_endpoints = [mock.Mock()] + status_endpoints = [mock.Mock()] + notification_endpoints = [mock.Mock()] def __init__(self): self.publisher_id = "pub_id" self.conductor_topic = "conductor_topic" self.status_topic = "status_topic" + self.notification_topics = [] self.api_version = self.API_VERSION @@ -84,7 +86,7 @@ class TestService(base.TestCase): m_handler.publish_event.assert_called_once_with(event, payload) @mock.patch.object(messaging_handler, "MessagingHandler") - def test_publish_status(self, m_handler_cls): + def test_publish_status_event(self, m_handler_cls): m_handler = mock.Mock() m_handler_cls.return_value = m_handler payload = { @@ -92,10 +94,10 @@ class TestService(base.TestCase): } event = "myevent" dummy_service = service.Service(DummyManager) - dummy_service.publish_status(event, payload) + dummy_service.publish_status_event(event, payload) m_handler.publish_event.assert_called_once_with(event, payload, None) - @mock.patch.object(service.Service, 'publish_status') + @mock.patch.object(service.Service, 'publish_status_event') def test_response(self, mock_call): event = "My event" context = {'request_id': 12} diff --git a/watcher/tests/decision_engine/audit/test_audit_handlers.py b/watcher/tests/decision_engine/audit/test_audit_handlers.py index 98c693b10..34f87d60d 100644 --- a/watcher/tests/decision_engine/audit/test_audit_handlers.py +++ b/watcher/tests/decision_engine/audit/test_audit_handlers.py @@ -73,9 +73,9 @@ class TestOneShotAuditHandler(base.DbTestCase): 'audit_uuid': self.audit.uuid}) calls = [call_on_going, call_succeeded] - messaging.status_topic_handler.publish_event.assert_has_calls(calls) + messaging.publish_status_event.assert_has_calls(calls) self.assertEqual( - 2, messaging.status_topic_handler.publish_event.call_count) + 2, messaging.publish_status_event.call_count) class TestContinuousAuditHandler(base.DbTestCase): diff --git a/watcher/tests/decision_engine/cluster/test_cluster_data_model_collector.py b/watcher/tests/decision_engine/cluster/test_cluster_data_model_collector.py index 6443cff68..f0d8433ee 100644 --- a/watcher/tests/decision_engine/cluster/test_cluster_data_model_collector.py +++ b/watcher/tests/decision_engine/cluster/test_cluster_data_model_collector.py @@ -23,6 +23,10 @@ from watcher.tests import base as test_base class DummyClusterDataModelCollector(base.BaseClusterDataModelCollector): + @property + def notification_endpoints(self): + return [] + def execute(self): model = model_root.ModelRoot() # Do something here... diff --git a/watcher/tests/decision_engine/model/notification/__init__.py b/watcher/tests/decision_engine/model/notification/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/tests/decision_engine/model/notification/data/instance-create.json b/watcher/tests/decision_engine/model/notification/data/instance-create.json new file mode 100644 index 000000000..ddb1aa056 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/instance-create.json @@ -0,0 +1,70 @@ +{ + "event_type": "instance.update", + "payload": { + "nova_object.data": { + "architecture": "x86_64", + "audit_period": { + "nova_object.data": { + "audit_period_beginning": "2012-10-01T00:00:00Z", + "audit_period_ending": "2012-10-29T13:42:11Z" + }, + "nova_object.name": "AuditPeriodPayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0" + }, + "availability_zone": null, + "bandwidth": [], + "created_at": "2012-10-29T13:42:11Z", + "deleted_at": null, + "display_name": "some-server", + "host": "compute", + "host_name": "some-server", + "image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6", + "kernel_id": "", + "launched_at": null, + "metadata": {}, + "node": "fake-mini", + "old_display_name": null, + "os_type": null, + "progress": 0, + "ramdisk_id": "", + "reservation_id": "r-sd3ygfjj", + "state": "active", + "task_state": "scheduling", + "power_state": "pending", + "ip_addresses": [], + "state_update": { + "nova_object.version": "1.0", + "nova_object.name": "InstanceStateUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.data": { + "old_state": "building", + "new_task_state": null, + "old_task_state": "spawning", + "state": "active" + } + }, + "tenant_id": "6f70656e737461636b20342065766572", + "terminated_at": null, + "flavor": { + "nova_object.name": "FlavorPayload", + "nova_object.data": { + "flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3", + "root_gb": 1, + "vcpus": 1, + "ephemeral_gb": 0, + "memory_mb": 512 + }, + "nova_object.version": "1.0", + "nova_object.namespace": "nova" + }, + "user_id": "fake", + "uuid": "c03c0bf9-f46e-4e4f-93f1-817568567ee2" + }, + "nova_object.name": "InstanceUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0" + }, + "priority": "INFO", + "publisher_id": "nova-compute:compute" +} diff --git a/watcher/tests/decision_engine/model/notification/data/instance-delete-end.json b/watcher/tests/decision_engine/model/notification/data/instance-delete-end.json new file mode 100644 index 000000000..75eaffaa5 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/instance-delete-end.json @@ -0,0 +1,49 @@ +{ + "event_type":"instance.delete.end", + "payload":{ + "nova_object.data":{ + "architecture":"x86_64", + "availability_zone":null, + "created_at":"2012-10-29T13:42:11Z", + "deleted_at":"2012-10-29T13:42:11Z", + "display_name":"some-server", + "fault":null, + "host":"compute", + "host_name":"some-server", + "ip_addresses":[], + "kernel_id":"", + "launched_at":"2012-10-29T13:42:11Z", + "image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6", + "metadata":{}, + "node":"fake-mini", + "os_type":null, + "progress":0, + "ramdisk_id":"", + "reservation_id":"r-npxv0e40", + "state":"deleted", + "task_state":null, + "power_state":"pending", + "tenant_id":"6f70656e737461636b20342065766572", + "terminated_at":"2012-10-29T13:42:11Z", + "flavor": { + "nova_object.name": "FlavorPayload", + "nova_object.data": { + "flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3", + "root_gb": 1, + "vcpus": 1, + "ephemeral_gb": 0, + "memory_mb": 512 + }, + "nova_object.version": "1.0", + "nova_object.namespace": "nova" + }, + "user_id":"fake", + "uuid":"178b0921-8f85-4257-88b6-2e743b5a975c" + }, + "nova_object.name":"InstanceActionPayload", + "nova_object.namespace":"nova", + "nova_object.version":"1.0" + }, + "priority":"INFO", + "publisher_id":"nova-compute:compute" +} diff --git a/watcher/tests/decision_engine/model/notification/data/instance-update.json b/watcher/tests/decision_engine/model/notification/data/instance-update.json new file mode 100644 index 000000000..f79485a04 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/instance-update.json @@ -0,0 +1,65 @@ +{ + "event_type": "instance.update", + "payload": { + "nova_object.data": { + "architecture": "x86_64", + "audit_period": { + "nova_object.data": { + "audit_period_beginning": "2012-10-01T00:00:00Z", + "audit_period_ending": "2012-10-29T13:42:11Z"}, + "nova_object.name": "AuditPeriodPayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0" + }, + "availability_zone": null, + "bandwidth": [], + "created_at": "2012-10-29T13:42:11Z", + "deleted_at": null, + "display_name": "some-server", + "host": "compute", + "host_name": "some-server", + "image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6", + "kernel_id": "", + "launched_at": null, + "metadata": {}, + "node": "fake-mini", + "old_display_name": null, + "os_type": null, + "progress": 0, + "ramdisk_id": "", + "reservation_id": "r-sd3ygfjj", + "state": "active", + "task_state": "scheduling", + "power_state": "pending", + "ip_addresses": [], + "state_update": { + "nova_object.data": { + "new_task_state": null, + "old_state": null, + "old_task_state": null, + "state": "active"}, + "nova_object.name": "InstanceStateUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0"}, + "tenant_id": "6f70656e737461636b20342065766572", + "terminated_at": null, + "flavor": { + "nova_object.name": "FlavorPayload", + "nova_object.data": { + "flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3", + "root_gb": 1, + "vcpus": 1, + "ephemeral_gb": 0, + "memory_mb": 512 + }, + "nova_object.version": "1.0", + "nova_object.namespace": "nova" + }, + "user_id": "fake", + "uuid": "c03c0bf9-f46e-4e4f-93f1-817568567ee2"}, + "nova_object.name": "InstanceUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0"}, + "priority": "INFO", + "publisher_id": "nova-compute:compute" +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_instance-create.json b/watcher/tests/decision_engine/model/notification/data/scenario3_instance-create.json new file mode 100644 index 000000000..d180f8d85 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_instance-create.json @@ -0,0 +1,70 @@ +{ + "event_type": "instance.update", + "payload": { + "nova_object.data": { + "architecture": "x86_64", + "audit_period": { + "nova_object.data": { + "audit_period_beginning": "2012-10-01T00:00:00Z", + "audit_period_ending": "2012-10-29T13:42:11Z" + }, + "nova_object.name": "AuditPeriodPayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0" + }, + "availability_zone": null, + "bandwidth": [], + "created_at": "2012-10-29T13:42:11Z", + "deleted_at": null, + "display_name": "some-server", + "host": "Node_0", + "host_name": "some-server", + "image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6", + "kernel_id": "", + "launched_at": null, + "metadata": {}, + "node": "hostname_0", + "old_display_name": null, + "os_type": null, + "progress": 0, + "ramdisk_id": "", + "reservation_id": "r-sd3ygfjj", + "state": "active", + "task_state": "scheduling", + "power_state": "pending", + "ip_addresses": [], + "state_update": { + "nova_object.version": "1.0", + "nova_object.name": "InstanceStateUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.data": { + "old_state": "building", + "new_task_state": null, + "old_task_state": "spawning", + "state": "active" + } + }, + "tenant_id": "6f70656e737461636b20342065766572", + "terminated_at": null, + "flavor": { + "nova_object.name": "FlavorPayload", + "nova_object.data": { + "flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3", + "root_gb": 1, + "vcpus": 1, + "ephemeral_gb": 0, + "memory_mb": 512 + }, + "nova_object.version": "1.0", + "nova_object.namespace": "nova" + }, + "user_id": "fake", + "uuid": "c03c0bf9-f46e-4e4f-93f1-817568567ee2" + }, + "nova_object.name": "InstanceUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0" + }, + "priority": "INFO", + "publisher_id": "nova-compute:Node_0" +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_instance-delete-end.json b/watcher/tests/decision_engine/model/notification/data/scenario3_instance-delete-end.json new file mode 100644 index 000000000..90898b8a7 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_instance-delete-end.json @@ -0,0 +1,49 @@ +{ + "event_type":"instance.delete.end", + "payload":{ + "nova_object.data":{ + "architecture":"x86_64", + "availability_zone":null, + "created_at":"2012-10-29T13:42:11Z", + "deleted_at":"2012-10-29T13:42:11Z", + "display_name":"some-server", + "fault":null, + "host":"Node_0", + "host_name":"some-server", + "ip_addresses":[], + "kernel_id":"", + "launched_at":"2012-10-29T13:42:11Z", + "image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6", + "metadata":{}, + "node":"fake-mini", + "os_type":null, + "progress":0, + "ramdisk_id":"", + "reservation_id":"r-npxv0e40", + "state":"deleted", + "task_state":null, + "power_state":"pending", + "tenant_id":"6f70656e737461636b20342065766572", + "terminated_at":"2012-10-29T13:42:11Z", + "flavor": { + "nova_object.name": "FlavorPayload", + "nova_object.data": { + "flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3", + "root_gb": 1, + "vcpus": 1, + "ephemeral_gb": 0, + "memory_mb": 512 + }, + "nova_object.version": "1.0", + "nova_object.namespace": "nova" + }, + "user_id":"fake", + "uuid":"73b09e16-35b7-4922-804e-e8f5d9b740fc" + }, + "nova_object.name":"InstanceActionPayload", + "nova_object.namespace":"nova", + "nova_object.version":"1.0" + }, + "priority":"INFO", + "publisher_id":"nova-compute:Node_0" +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_instance-update.json b/watcher/tests/decision_engine/model/notification/data/scenario3_instance-update.json new file mode 100644 index 000000000..23d23b9e1 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_instance-update.json @@ -0,0 +1,65 @@ +{ + "event_type": "instance.update", + "payload": { + "nova_object.data": { + "architecture": "x86_64", + "audit_period": { + "nova_object.data": { + "audit_period_beginning": "2012-10-01T00:00:00Z", + "audit_period_ending": "2012-10-29T13:42:11Z"}, + "nova_object.name": "AuditPeriodPayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0" + }, + "availability_zone": null, + "bandwidth": [], + "created_at": "2012-10-29T13:42:11Z", + "deleted_at": null, + "display_name": "NEW_INSTANCE0", + "host": "Node_0", + "host_name": "NEW_INSTANCE0", + "image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6", + "kernel_id": "", + "launched_at": null, + "metadata": {}, + "node": "hostname_0", + "old_display_name": null, + "os_type": null, + "progress": 0, + "ramdisk_id": "", + "reservation_id": "r-sd3ygfjj", + "state": "paused", + "task_state": "scheduling", + "power_state": "pending", + "ip_addresses": [], + "state_update": { + "nova_object.data": { + "old_task_state": null, + "new_task_state": null, + "old_state": "paused", + "state": "paused"}, + "nova_object.name": "InstanceStateUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0"}, + "tenant_id": "6f70656e737461636b20342065766572", + "terminated_at": null, + "flavor": { + "nova_object.name": "FlavorPayload", + "nova_object.data": { + "flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3", + "root_gb": 1, + "vcpus": 1, + "ephemeral_gb": 0, + "memory_mb": 512 + }, + "nova_object.version": "1.0", + "nova_object.namespace": "nova" + }, + "user_id": "fake", + "uuid": "73b09e16-35b7-4922-804e-e8f5d9b740fc"}, + "nova_object.name": "InstanceUpdatePayload", + "nova_object.namespace": "nova", + "nova_object.version": "1.0"}, + "priority": "INFO", + "publisher_id": "nova-compute:Node_0" +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-create-end.json b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-create-end.json new file mode 100644 index 000000000..3a0b36658 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-create-end.json @@ -0,0 +1,62 @@ +{ + "event_type": "compute.instance.create.end", + "metadata": { + "message_id": "577bfd11-88e0-4044-b8ae-496e3257efe2", + "timestamp": "2016-08-19 10:20:59.279903" + }, + "payload": { + "access_ip_v4": null, + "access_ip_v6": null, + "architecture": null, + "availability_zone": "nova", + "cell_name": "", + "created_at": "2016-08-19 10:20:49+00:00", + "deleted_at": "", + "disk_gb": 1, + "display_name": "INSTANCE_0", + "ephemeral_gb": 0, + "fixed_ips": [ + { + "address": "192.168.1.197", + "floating_ips": [], + "label": "demo-net", + "meta": {}, + "type": "fixed", + "version": 4, + "vif_mac": "fa:16:3e:a3:c0:0f" + } + ], + "host": "Node_0", + "hostname": "INSTANCE_0", + "image_meta": { + "base_image_ref": "205f96f5-91f9-42eb-9138-03fffcea2b97", + "container_format": "bare", + "disk_format": "qcow2", + "min_disk": "1", + "min_ram": "0" + }, + "image_ref_url": "http://127.0.0.1:9292/images/205f96f5-91f9-42eb-9138-03fffcea2b97", + "instance_flavor_id": "1", + "instance_id": "c03c0bf9-f46e-4e4f-93f1-817568567ee2", + "instance_type": "m1.tiny", + "instance_type_id": 2, + "kernel_id": "", + "launched_at": "2016-08-19T10:20:59.135390", + "memory_mb": 512, + "message": "Success", + "metadata": {}, + "node": "Node_0", + "os_type": null, + "progress": "", + "ramdisk_id": "", + "reservation_id": "r-56edz88e", + "root_gb": 1, + "state": "active", + "state_description": "", + "tenant_id": "57ab04ad6d3b495789a58258bc00842b", + "terminated_at": "", + "user_id": "cd7d93be51e4460ab51514b2a925b23a", + "vcpus": 1 + }, + "publisher_id": "compute.Node_0" +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-delete-end.json b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-delete-end.json new file mode 100644 index 000000000..12b0a129d --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-delete-end.json @@ -0,0 +1,46 @@ +{ + "publisher_id": "compute:compute", + "event_type": "compute.instance.delete.end", + "payload": { + "access_ip_v4": null, + "access_ip_v6": null, + "architecture": null, + "availability_zone": "nova", + "cell_name": "", + "created_at": "2016-08-17 15:10:12+00:00", + "deleted_at": "2016-08-17T15:10:33.000000", + "disk_gb": 1, + "display_name": "some-server", + "ephemeral_gb": 0, + "host": "Node_0", + "hostname": "some-server", + "image_meta": { + "base_image_ref": "205f96f5-91f9-42eb-9138-03fffcea2b97", + "container_format": "bare", + "disk_format": "qcow2", + "min_disk": "1", + "min_ram": "0" + }, + "image_ref_url": "http://10.50.254.222:9292/images/205f96f5-91f9-42eb-9138-03fffcea2b97", + "instance_flavor_id": "1", + "instance_id": "73b09e16-35b7-4922-804e-e8f5d9b740fc", + "instance_type": "m1.tiny", + "instance_type_id": 2, + "kernel_id": "", + "launched_at": "2016-08-17T15:10:23.000000", + "memory_mb": 512, + "metadata": {}, + "node": "Node_0", + "os_type": null, + "progress": "", + "ramdisk_id": "", + "reservation_id": "r-z76fnsyy", + "root_gb": 1, + "state": "deleted", + "state_description": "", + "tenant_id": "15995ea2694e4268b3631db32e38678b", + "terminated_at": "2016-08-17T15:10:33.008164", + "user_id": "cd7d93be51e4460ab51514b2a925b23a", + "vcpus": 1 + } +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-update.json b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-update.json new file mode 100644 index 000000000..ce2b9979c --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_instance-update.json @@ -0,0 +1,52 @@ +{ + "publisher_id": "compute:Node_0", + "event_type": "compute.instance.update", + "payload": { + "access_ip_v4": null, + "access_ip_v6": null, + "architecture": null, + "audit_period_beginning": "2016-08-17T13:00:00.000000", + "audit_period_ending": "2016-08-17T13:56:05.262440", + "availability_zone": "nova", + "bandwidth": {}, + "cell_name": "", + "created_at": "2016-08-17 13:53:23+00:00", + "deleted_at": "", + "disk_gb": 1, + "display_name": "NEW_INSTANCE0", + "ephemeral_gb": 0, + "host": "Node_0", + "hostname": "NEW_INSTANCE0", + "image_meta": { + "base_image_ref": "205f96f5-91f9-42eb-9138-03fffcea2b97", + "container_format": "bare", + "disk_format": "qcow2", + "min_disk": "1", + "min_ram": "0" + }, + "image_ref_url": "http://10.50.0.222:9292/images/205f96f5-91f9-42eb-9138-03fffcea2b97", + "instance_flavor_id": "1", + "instance_id": "73b09e16-35b7-4922-804e-e8f5d9b740fc", + "instance_type": "m1.tiny", + "instance_type_id": 2, + "kernel_id": "", + "launched_at": "2016-08-17T13:53:35.000000", + "memory_mb": 512, + "metadata": {}, + "new_task_state": null, + "node": "hostname_0", + "old_state": "paused", + "old_task_state": null, + "os_type": null, + "progress": "", + "ramdisk_id": "", + "reservation_id": "r-0822ymml", + "root_gb": 1, + "state": "paused", + "state_description": "paused", + "tenant_id": "a4b4772d93c74d5e8b7c68cdd2a014e1", + "terminated_at": "", + "user_id": "ce64facc93354bbfa90f4f9f9a3e1e75", + "vcpus": 1 + } +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_livemigration-post-dest-end.json b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_livemigration-post-dest-end.json new file mode 100644 index 000000000..916b91b0c --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_legacy_livemigration-post-dest-end.json @@ -0,0 +1,61 @@ +{ + "event_type": "compute.instance.live_migration.post.dest.end", + "metadata": { + "message_id": "9f58cad4-ff90-40f8-a8e4-633807f4a995", + "timestamp": "2016-08-19 10:13:44.645575" + }, + "payload": { + "access_ip_v4": null, + "access_ip_v6": null, + "architecture": null, + "availability_zone": "nova", + "cell_name": "", + "created_at": "2016-08-18 09:49:23+00:00", + "deleted_at": "", + "disk_gb": 1, + "display_name": "INSTANCE_0", + "ephemeral_gb": 0, + "fixed_ips": [ + { + "address": "192.168.1.196", + "floating_ips": [], + "label": "demo-net", + "meta": {}, + "type": "fixed", + "version": 4, + "vif_mac": "fa:16:3e:cc:ba:81" + } + ], + "host": "Node_1", + "hostname": "INSTANCE_0", + "image_meta": { + "base_image_ref": "205f96f5-91f9-42eb-9138-03fffcea2b97", + "container_format": "bare", + "disk_format": "qcow2", + "min_disk": "1", + "min_ram": "0" + }, + "image_ref_url": "http://10.50.254.222:9292/images/205f96f5-91f9-42eb-9138-03fffcea2b97", + "instance_flavor_id": "1", + "instance_id": "73b09e16-35b7-4922-804e-e8f5d9b740fc", + "instance_type": "m1.tiny", + "instance_type_id": 2, + "kernel_id": "", + "launched_at": "2016-08-18T09:49:33.000000", + "memory_mb": 512, + "metadata": {}, + "node": "Node_1", + "os_type": null, + "progress": "", + "ramdisk_id": "", + "reservation_id": "r-he04tfco", + "root_gb": 1, + "state": "active", + "state_description": "", + "tenant_id": "57ab04ad6d3b495789a58258bc00842b", + "terminated_at": "", + "user_id": "cd7d93be51e4460ab51514b2a925b23a", + "vcpus": 1 + }, + "publisher_id": "compute.Node_1" +} diff --git a/watcher/tests/decision_engine/model/notification/data/scenario3_service-update.json b/watcher/tests/decision_engine/model/notification/data/scenario3_service-update.json new file mode 100644 index 000000000..410f12d0e --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/scenario3_service-update.json @@ -0,0 +1,21 @@ +{ + "priority": "INFO", + "payload": { + "nova_object.namespace": "nova", + "nova_object.name": "ServiceStatusPayload", + "nova_object.version": "1.0", + "nova_object.data": { + "host": "Node_0", + "disabled": true, + "last_seen_up": "2012-10-29T13:42:05Z", + "binary": "nova-compute", + "topic": "compute", + "disabled_reason": null, + "report_count": 1, + "forced_down": true, + "version": 15 + } + }, + "event_type": "service.update", + "publisher_id": "nova-compute:Node_0" +} diff --git a/watcher/tests/decision_engine/model/notification/data/service-update.json b/watcher/tests/decision_engine/model/notification/data/service-update.json new file mode 100644 index 000000000..1baf63a23 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/data/service-update.json @@ -0,0 +1,21 @@ +{ + "priority": "INFO", + "payload": { + "nova_object.namespace": "nova", + "nova_object.name": "ServiceStatusPayload", + "nova_object.version": "1.0", + "nova_object.data": { + "host": "host1", + "disabled": false, + "last_seen_up": "2012-10-29T13:42:05Z", + "binary": "nova-compute", + "topic": "compute", + "disabled_reason": null, + "report_count": 1, + "forced_down": false, + "version": 15 + } + }, + "event_type": "service.update", + "publisher_id": "nova-compute:host1" +} diff --git a/watcher/tests/decision_engine/model/notification/fake_managers.py b/watcher/tests/decision_engine/model/notification/fake_managers.py new file mode 100644 index 000000000..59b652bd1 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/fake_managers.py @@ -0,0 +1,53 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from watcher.decision_engine.model.notification import nova as novanotification +from watcher.tests.decision_engine.strategy.strategies \ + import faker_cluster_state + + +class FakeManager(object): + + API_VERSION = '1.0' + + def __init__(self): + self.api_version = self.API_VERSION + + # fake cluster instead on Nova CDM + self.fake_cdmc = faker_cluster_state.FakerModelCollector() + + self.publisher_id = 'test_publisher_id' + self.conductor_topic = 'test_conductor_topic' + self.status_topic = 'test_status_topic' + self.notification_topics = ['nova'] + + self.conductor_endpoints = [] # Disable audit endpoint + self.status_endpoints = [] + + self.notification_endpoints = [ + novanotification.ServiceUpdated(self.fake_cdmc), + + novanotification.InstanceCreated(self.fake_cdmc), + novanotification.InstanceUpdated(self.fake_cdmc), + novanotification.InstanceDeletedEnd(self.fake_cdmc), + + novanotification.LegacyInstanceCreatedEnd(self.fake_cdmc), + novanotification.LegacyInstanceUpdated(self.fake_cdmc), + novanotification.LegacyLiveMigratedEnd(self.fake_cdmc), + novanotification.LegacyInstanceDeletedEnd(self.fake_cdmc), + ] diff --git a/watcher/tests/decision_engine/model/notification/test_notifications.py b/watcher/tests/decision_engine/model/notification/test_notifications.py new file mode 100644 index 000000000..22fb30d8b --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/test_notifications.py @@ -0,0 +1,106 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import mock +from oslo_serialization import jsonutils + +from watcher.common import context +from watcher.common import service as watcher_service +from watcher.decision_engine.model.notification import base +from watcher.decision_engine.model.notification import filtering +from watcher.tests import base as base_test +from watcher.tests.decision_engine.model.notification import fake_managers + + +class DummyManager(fake_managers.FakeManager): + + def __init__(self): + super(DummyManager, self).__init__() + self.notification_endpoints = [DummyNotification(self.fake_cdmc)] + + +class DummyNotification(base.NotificationEndpoint): + + @property + def filter_rule(self): + return filtering.NotificationFilter( + publisher_id=r'.*', + event_type=r'compute.dummy', + payload={'data': {'nested': r'^T.*'}}, + ) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + pass + + +class NotificationTestCase(base_test.TestCase): + + def load_message(self, filename): + cwd = os.path.abspath(os.path.dirname(__file__)) + data_folder = os.path.join(cwd, "data") + + with open(os.path.join(data_folder, filename), 'rb') as json_file: + json_data = jsonutils.load(json_file) + + return json_data + + +class TestReceiveNotifications(NotificationTestCase): + + def setUp(self): + super(TestReceiveNotifications, self).setUp() + + p_from_dict = mock.patch.object(context.RequestContext, 'from_dict') + m_from_dict = p_from_dict.start() + m_from_dict.return_value = self.context + self.addCleanup(p_from_dict.stop) + + @mock.patch.object(DummyNotification, 'info') + def test_receive_dummy_notification(self, m_info): + message = { + 'publisher_id': 'nova-compute', + 'event_type': 'compute.dummy', + 'payload': {'data': {'nested': 'TEST'}}, + 'priority': 'INFO', + } + de_service = watcher_service.Service(DummyManager) + incoming = mock.Mock(ctxt=self.context.to_dict(), message=message) + + de_service.notification_handler.dispatcher.dispatch(incoming) + + m_info.assert_called_once_with( + self.context, 'nova-compute', 'compute.dummy', + {'data': {'nested': 'TEST'}}, + {'message_id': None, 'timestamp': None}) + + @mock.patch.object(DummyNotification, 'info') + def test_skip_unwanted_notification(self, m_info): + message = { + 'publisher_id': 'nova-compute', + 'event_type': 'compute.dummy', + 'payload': {'data': {'nested': 'unwanted'}}, + 'priority': 'INFO', + } + de_service = watcher_service.Service(DummyManager) + incoming = mock.Mock(ctxt=self.context.to_dict(), message=message) + + de_service.notification_handler.dispatcher.dispatch(incoming) + + self.assertEqual(0, m_info.call_count) diff --git a/watcher/tests/decision_engine/model/notification/test_nova_notifications.py b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py new file mode 100644 index 000000000..f183d9243 --- /dev/null +++ b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py @@ -0,0 +1,450 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import mock +from oslo_serialization import jsonutils + +from watcher.common import context +from watcher.common import exception +from watcher.common import service as watcher_service +from watcher.decision_engine.model import element +from watcher.decision_engine.model import model_root +from watcher.decision_engine.model.notification import nova as novanotification +from watcher.tests import base as base_test +from watcher.tests.decision_engine.model.notification import fake_managers +from watcher.tests.decision_engine.strategy.strategies \ + import faker_cluster_state + + +class NotificationTestCase(base_test.TestCase): + + def load_message(self, filename): + cwd = os.path.abspath(os.path.dirname(__file__)) + data_folder = os.path.join(cwd, "data") + + with open(os.path.join(data_folder, filename), 'rb') as json_file: + json_data = jsonutils.load(json_file) + + return json_data + + +class TestReceiveNovaNotifications(NotificationTestCase): + + FAKE_METADATA = {'message_id': None, 'timestamp': None} + + def setUp(self): + super(TestReceiveNovaNotifications, self).setUp() + + p_from_dict = mock.patch.object(context.RequestContext, 'from_dict') + m_from_dict = p_from_dict.start() + m_from_dict.return_value = self.context + self.addCleanup(p_from_dict.stop) + + @mock.patch.object(novanotification.ServiceUpdated, 'info') + def test_nova_receive_service_update(self, m_info): + message = self.load_message('service-update.json') + expected_message = message['payload'] + + de_service = watcher_service.Service(fake_managers.FakeManager) + incoming = mock.Mock(ctxt=self.context.to_dict(), message=message) + + de_service.notification_handler.dispatcher.dispatch(incoming) + m_info.assert_called_once_with( + self.context, 'nova-compute:host1', 'service.update', + expected_message, self.FAKE_METADATA) + + @mock.patch.object(novanotification.InstanceCreated, 'info') + def test_nova_receive_instance_create(self, m_info): + message = self.load_message('instance-create.json') + expected_message = message['payload'] + + de_service = watcher_service.Service(fake_managers.FakeManager) + incoming = mock.Mock(ctxt=self.context.to_dict(), message=message) + + de_service.notification_handler.dispatcher.dispatch(incoming) + m_info.assert_called_once_with( + self.context, 'nova-compute:compute', 'instance.update', + expected_message, self.FAKE_METADATA) + + @mock.patch.object(novanotification.InstanceUpdated, 'info') + def test_nova_receive_instance_update(self, m_info): + message = self.load_message('instance-update.json') + expected_message = message['payload'] + + de_service = watcher_service.Service(fake_managers.FakeManager) + incoming = mock.Mock(ctxt=self.context.to_dict(), message=message) + + de_service.notification_handler.dispatcher.dispatch(incoming) + m_info.assert_called_once_with( + self.context, 'nova-compute:compute', 'instance.update', + expected_message, self.FAKE_METADATA) + + @mock.patch.object(novanotification.InstanceDeletedEnd, 'info') + def test_nova_receive_instance_delete_end(self, m_info): + message = self.load_message('instance-delete-end.json') + expected_message = message['payload'] + + de_service = watcher_service.Service(fake_managers.FakeManager) + incoming = mock.Mock(ctxt=self.context.to_dict(), message=message) + + de_service.notification_handler.dispatcher.dispatch(incoming) + m_info.assert_called_once_with( + self.context, 'nova-compute:compute', 'instance.delete.end', + expected_message, self.FAKE_METADATA) + + +class TestNovaNotifications(NotificationTestCase): + + FAKE_METADATA = {'message_id': None, 'timestamp': None} + + def setUp(self): + super(TestNovaNotifications, self).setUp() + # fake cluster + self.fake_cdmc = faker_cluster_state.FakerModelCollector() + + def test_nova_service_update(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.ServiceUpdated(self.fake_cdmc) + + node0_uuid = 'Node_0' + node0 = compute_model.get_node_from_id(node0_uuid) + + message = self.load_message('scenario3_service-update.json') + + self.assertEqual('hostname_0', node0.hostname) + self.assertEqual(element.ServiceState.ONLINE.value, node0.state) + self.assertEqual(element.ServiceState.ENABLED.value, node0.status) + + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + self.assertEqual('Node_0', node0.hostname) + self.assertEqual(element.ServiceState.OFFLINE.value, node0.state) + self.assertEqual(element.ServiceState.DISABLED.value, node0.status) + + def test_nova_instance_update(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.InstanceUpdated(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + instance0 = compute_model.get_instance_from_id(instance0_uuid) + + message = self.load_message('scenario3_instance-update.json') + + self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state) + + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) + + def test_nova_instance_update_notfound_creates(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.InstanceUpdated(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + + message = self.load_message('scenario3_instance-update.json') + + with mock.patch.object( + model_root.ModelRoot, 'get_instance_from_id' + ) as m_get_instance_from_id: + m_get_instance_from_id.side_effect = exception.InstanceNotFound( + name='TEST') + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + instance0 = compute_model.get_instance_from_id(instance0_uuid) + cpu_capacity = compute_model.get_resource_from_id( + element.ResourceType.cpu_cores) + disk_capacity = compute_model.get_resource_from_id( + element.ResourceType.disk) + memory_capacity = compute_model.get_resource_from_id( + element.ResourceType.memory) + + self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) + self.assertEqual(1, cpu_capacity.get_capacity(instance0)) + self.assertEqual(1, disk_capacity.get_capacity(instance0)) + self.assertEqual(512, memory_capacity.get_capacity(instance0)) + + def test_nova_instance_create(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.InstanceCreated(self.fake_cdmc) + + instance0_uuid = 'c03c0bf9-f46e-4e4f-93f1-817568567ee2' + + self.assertRaises( + exception.InstanceNotFound, + compute_model.get_instance_from_id, instance0_uuid) + + message = self.load_message('scenario3_instance-create.json') + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + instance0 = compute_model.get_instance_from_id(instance0_uuid) + cpu_capacity = compute_model.get_resource_from_id( + element.ResourceType.cpu_cores) + disk_capacity = compute_model.get_resource_from_id( + element.ResourceType.disk) + memory_capacity = compute_model.get_resource_from_id( + element.ResourceType.memory) + + self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state) + self.assertEqual(1, cpu_capacity.get_capacity(instance0)) + self.assertEqual(1, disk_capacity.get_capacity(instance0)) + self.assertEqual(512, memory_capacity.get_capacity(instance0)) + + def test_nova_instance_delete_end(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.InstanceDeletedEnd(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + + # Before + self.assertTrue(compute_model.get_instance_from_id(instance0_uuid)) + for resource in compute_model.resource.values(): + self.assertIn(instance0_uuid, resource.mapping) + + message = self.load_message('scenario3_instance-delete-end.json') + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + # After + self.assertRaises( + exception.InstanceNotFound, + compute_model.get_instance_from_id, instance0_uuid) + + for resource in compute_model.resource.values(): + self.assertNotIn(instance0_uuid, resource.mapping) + + +class TestLegacyNovaNotifications(NotificationTestCase): + + FAKE_METADATA = {'message_id': None, 'timestamp': None} + + def setUp(self): + super(TestLegacyNovaNotifications, self).setUp() + # fake cluster + self.fake_cdmc = faker_cluster_state.FakerModelCollector() + + def test_legacy_instance_created_end(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.LegacyInstanceCreatedEnd(self.fake_cdmc) + + instance0_uuid = 'c03c0bf9-f46e-4e4f-93f1-817568567ee2' + self.assertRaises( + exception.InstanceNotFound, + compute_model.get_instance_from_id, instance0_uuid) + + message = self.load_message( + 'scenario3_legacy_instance-create-end.json') + + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + instance0 = compute_model.get_instance_from_id(instance0_uuid) + cpu_capacity = compute_model.get_resource_from_id( + element.ResourceType.cpu_cores) + disk_capacity = compute_model.get_resource_from_id( + element.ResourceType.disk) + memory_capacity = compute_model.get_resource_from_id( + element.ResourceType.memory) + + self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state) + self.assertEqual(1, cpu_capacity.get_capacity(instance0)) + self.assertEqual(1, disk_capacity.get_capacity(instance0)) + self.assertEqual(512, memory_capacity.get_capacity(instance0)) + + def test_legacy_instance_updated(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.LegacyInstanceUpdated(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + instance0 = compute_model.get_instance_from_id(instance0_uuid) + + message = self.load_message('scenario3_legacy_instance-update.json') + + self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state) + + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) + + def test_legacy_instance_update_notfound_creates(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.LegacyInstanceUpdated(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + + message = self.load_message('scenario3_legacy_instance-update.json') + + with mock.patch.object( + model_root.ModelRoot, 'get_instance_from_id' + ) as m_get_instance_from_id: + m_get_instance_from_id.side_effect = exception.InstanceNotFound( + name='TEST') + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + instance0 = compute_model.get_instance_from_id(instance0_uuid) + self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) + + def test_legacy_instance_update_node_notfound_stil_creates(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.LegacyInstanceUpdated(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + + message = self.load_message('scenario3_legacy_instance-update.json') + + with mock.patch.object( + model_root.ModelRoot, 'get_instance_from_id' + ) as m_get_instance_from_id: + m_get_instance_from_id.side_effect = exception.InstanceNotFound( + name='TEST') + with mock.patch.object( + model_root.ModelRoot, 'get_node_from_id' + ) as m_get_node_from_id: + m_get_node_from_id.side_effect = exception.ComputeNodeNotFound( + name='TEST') + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + instance0 = compute_model.get_instance_from_id(instance0_uuid) + cpu_capacity = compute_model.get_resource_from_id( + element.ResourceType.cpu_cores) + disk_capacity = compute_model.get_resource_from_id( + element.ResourceType.disk) + memory_capacity = compute_model.get_resource_from_id( + element.ResourceType.memory) + + self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) + self.assertEqual(1, cpu_capacity.get_capacity(instance0)) + self.assertEqual(1, disk_capacity.get_capacity(instance0)) + self.assertEqual(512, memory_capacity.get_capacity(instance0)) + + def test_legacy_live_migrated_end(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.LegacyLiveMigratedEnd(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + instance0 = compute_model.get_instance_from_id(instance0_uuid) + + node = compute_model.get_node_from_instance_id(instance0_uuid) + self.assertEqual('Node_0', node.uuid) + + message = self.load_message( + 'scenario3_legacy_livemigration-post-dest-end.json') + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + node = compute_model.get_node_from_instance_id(instance0_uuid) + self.assertEqual('Node_1', node.uuid) + self.assertEqual(element.InstanceState.ACTIVE.value, instance0.state) + + def test_legacy_instance_deleted_end(self): + compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() + self.fake_cdmc.cluster_data_model = compute_model + handler = novanotification.LegacyInstanceDeletedEnd(self.fake_cdmc) + + instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' + + # Before + self.assertTrue(compute_model.get_instance_from_id(instance0_uuid)) + for resource in compute_model.resource.values(): + self.assertIn(instance0_uuid, resource.mapping) + + message = self.load_message( + 'scenario3_legacy_instance-delete-end.json') + handler.info( + ctxt=self.context, + publisher_id=message['publisher_id'], + event_type=message['event_type'], + payload=message['payload'], + metadata=self.FAKE_METADATA, + ) + + # After + self.assertRaises( + exception.InstanceNotFound, + compute_model.get_instance_from_id, instance0_uuid) + + for resource in compute_model.resource.values(): + self.assertNotIn(instance0_uuid, resource.mapping) diff --git a/watcher/tests/decision_engine/model/test_mapping.py b/watcher/tests/decision_engine/model/test_mapping.py index 44b9a56fe..4c4ccc055 100644 --- a/watcher/tests/decision_engine/model/test_mapping.py +++ b/watcher/tests/decision_engine/model/test_mapping.py @@ -80,16 +80,16 @@ class TestMapping(base.TestCase): self.assertEqual( False, - model.mapping.migrate_instance(instance1, node1, node1)) + model.migrate_instance(instance1, node1, node1)) self.assertEqual( False, - model.mapping.migrate_instance(instance1, node0, node0)) + model.migrate_instance(instance1, node0, node0)) self.assertEqual( True, - model.mapping.migrate_instance(instance1, node1, node0)) + model.migrate_instance(instance1, node1, node0)) self.assertEqual( True, - model.mapping.migrate_instance(instance1, node0, node1)) + model.migrate_instance(instance1, node0, node1)) def test_unmap_from_id_log_warning(self): model = self.fake_cluster.generate_scenario_3_with_2_nodes() diff --git a/watcher/tests/decision_engine/model/test_model.py b/watcher/tests/decision_engine/model/test_model.py index 24d47b5d2..3cc45c328 100644 --- a/watcher/tests/decision_engine/model/test_model.py +++ b/watcher/tests/decision_engine/model/test_model.py @@ -73,11 +73,11 @@ class TestModel(base.TestCase): node.uuid = id_ model.add_node(node) - self.assertIsInstance(node.state, element.ServiceState) + self.assertIn(node.state, [el.value for el in element.ServiceState]) node = model.get_node_from_id(id_) - node.state = element.ServiceState.OFFLINE - self.assertIsInstance(node.state, element.ServiceState) + node.state = element.ServiceState.OFFLINE.value + self.assertIn(node.state, [el.value for el in element.ServiceState]) def test_node_from_id_raise(self): model = model_root.ModelRoot() diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py index 176174518..9a100986c 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py @@ -31,6 +31,10 @@ class FakerModelCollector(base.BaseClusterDataModelCollector): config = mock.Mock() super(FakerModelCollector, self).__init__(config) + @property + def notification_endpoints(self): + return [] + def execute(self): return self.generate_scenario_1() diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py index 9aad08e03..2aedfaff8 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py @@ -30,8 +30,12 @@ class FakerModelCollector(base.BaseClusterDataModelCollector): config = mock.Mock(period=777) super(FakerModelCollector, self).__init__(config) + @property + def notification_endpoints(self): + return [] + def execute(self): - return self.generate_scenario_1() + return self._cluster_data_model or self.generate_scenario_1() def generate_scenario_1(self): instances = [] diff --git a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py index bc7e7146d..aa788dd95 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py @@ -95,10 +95,12 @@ class TestUniformAirflow(base.TestCase): self.strategy.threshold_inlet_t = 22 n1, n2 = self.strategy.group_hosts_by_airflow() instance_to_mig = self.strategy.choose_instance_to_migrate(n1) + self.assertEqual(instance_to_mig[0].uuid, 'Node_0') self.assertEqual(len(instance_to_mig[1]), 1) - self.assertEqual(instance_to_mig[1][0].uuid, - "cae81432-1631-4d4e-b29c-6f3acdcde906") + self.assertIn(instance_to_mig[1][0].uuid, + {'cae81432-1631-4d4e-b29c-6f3acdcde906', + '73b09e16-35b7-4922-804e-e8f5d9b740fc'}) def test_choose_instance_to_migrate_all(self): model = self.fake_cluster.generate_scenario_7_with_2_nodes() @@ -107,10 +109,12 @@ class TestUniformAirflow(base.TestCase): self.strategy.threshold_inlet_t = 25 n1, n2 = self.strategy.group_hosts_by_airflow() instance_to_mig = self.strategy.choose_instance_to_migrate(n1) + self.assertEqual(instance_to_mig[0].uuid, 'Node_0') self.assertEqual(len(instance_to_mig[1]), 2) - self.assertEqual(instance_to_mig[1][1].uuid, - "73b09e16-35b7-4922-804e-e8f5d9b740fc") + self.assertEqual({'cae81432-1631-4d4e-b29c-6f3acdcde906', + '73b09e16-35b7-4922-804e-e8f5d9b740fc'}, + {inst.uuid for inst in instance_to_mig[1]}) def test_choose_instance_notfound(self): model = self.fake_cluster.generate_scenario_7_with_2_nodes() @@ -132,10 +136,12 @@ class TestUniformAirflow(base.TestCase): instance_to_mig = self.strategy.choose_instance_to_migrate(n1) dest_hosts = self.strategy.filter_destination_hosts( n2, instance_to_mig[1]) + self.assertEqual(len(dest_hosts), 1) self.assertEqual(dest_hosts[0]['node'].uuid, 'Node_1') - self.assertEqual(dest_hosts[0]['instance'].uuid, - 'cae81432-1631-4d4e-b29c-6f3acdcde906') + self.assertIn(instance_to_mig[1][0].uuid, + {'cae81432-1631-4d4e-b29c-6f3acdcde906', + '73b09e16-35b7-4922-804e-e8f5d9b740fc'}) def test_exception_model(self): self.m_model.return_value = None