# -*- encoding: utf-8 -*- # # Copyright © 2012 eNovance ## # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import datetime import socket import eventlet from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import _options from oslo_log import log import oslo_messaging as om from oslo_reports import guru_meditation_report as gmr from oslo_reports import opts as gmr_opts from oslo_service import service from oslo_service import wsgi from watcher._i18n import _ from watcher.api import app from watcher.common import config from watcher.common import context from watcher.common import rpc from watcher.common import scheduling from watcher import objects from watcher.objects import base from watcher import opts from watcher import version # NOTE: # Ubuntu 14.04 forces librabbitmq when kombu is used # Unfortunately it forces a version that has a crash # bug. Calling eventlet.monkey_patch() tells kombu # to use libamqp instead. eventlet.monkey_patch() service_opts = [ cfg.IntOpt('periodic_interval', default=60, help=_('Seconds between running periodic tasks.')), cfg.StrOpt('host', default=socket.getfqdn(), help=_('Name of this node. This can be an opaque identifier. ' 'It is not necessarily a hostname, FQDN, or IP address. ' 'However, the node name must be valid within ' 'an AMQP key, and if using ZeroMQ, a valid ' 'hostname, FQDN, or IP address.')), cfg.IntOpt('service_down_time', default=90, help=_('Maximum time since last check-in for up service.')) ] cfg.CONF.register_opts(service_opts) CONF = cfg.CONF LOG = log.getLogger(__name__) _DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO', 'oslo.messaging=INFO', 'sqlalchemy=WARN', 'keystoneclient=INFO', 'stevedore=INFO', 'eventlet.wsgi.server=WARN', 'iso8601=WARN', 'paramiko=WARN', 'requests=WARN', 'neutronclient=WARN', 'glanceclient=WARN', 'watcher.openstack.common=WARN'] Singleton = service.Singleton class WSGIService(service.ServiceBase): """Provides ability to launch Watcher API from wsgi app.""" def __init__(self, service_name, use_ssl=False): """Initialize, but do not start the WSGI server. :param service_name: The service name of the WSGI server. :param use_ssl: Wraps the socket in an SSL context if True. """ self.service_name = service_name self.app = app.VersionSelectorApplication() self.workers = (CONF.api.workers or processutils.get_worker_count()) self.server = wsgi.Server(CONF, self.service_name, self.app, host=CONF.api.host, port=CONF.api.port, use_ssl=use_ssl, logger_name=self.service_name) def start(self): """Start serving this service using loaded configuration""" self.server.start() def stop(self): """Stop serving this API""" self.server.stop() def wait(self): """Wait for the service to stop serving this API""" self.server.wait() def reset(self): """Reset server greenpool size to default""" self.server.reset() class ServiceHeartbeat(scheduling.BackgroundSchedulerService): def __init__(self, gconfig=None, service_name=None, **kwargs): gconfig = None or {} super(ServiceHeartbeat, self).__init__(gconfig, **kwargs) self.service_name = service_name self.context = context.make_context() def send_beat(self): host = CONF.host watcher_list = objects.Service.list( self.context, filters={'name': self.service_name, 'host': host}) if watcher_list: watcher_service = watcher_list[0] watcher_service.last_seen_up = datetime.datetime.utcnow() watcher_service.save() else: watcher_service = objects.Service(self.context) watcher_service.name = self.service_name watcher_service.host = host watcher_service.create() def add_heartbeat_job(self): self.add_job(self.send_beat, 'interval', seconds=60, next_run_time=datetime.datetime.now()) def start(self): """Start service.""" self.add_heartbeat_job() super(ServiceHeartbeat, self).start() def stop(self): """Stop service.""" self.shutdown() def wait(self): """Wait for service to complete.""" def reset(self): """Reset service. Called in case service running in daemon mode receives SIGHUP. """ class Service(service.ServiceBase): API_VERSION = '1.0' def __init__(self, manager_class): super(Service, self).__init__() self.manager = manager_class() self.publisher_id = self.manager.publisher_id self.api_version = self.manager.api_version self.conductor_topic = self.manager.conductor_topic self.status_topic = self.manager.status_topic self.notification_topics = self.manager.notification_topics self.conductor_endpoints = [ ep(self) for ep in self.manager.conductor_endpoints ] self.status_endpoints = [ ep(self.publisher_id) for ep in self.manager.status_endpoints ] self.notification_endpoints = self.manager.notification_endpoints self.serializer = rpc.RequestContextSerializer( base.WatcherObjectSerializer()) self._transport = None self._notification_transport = None self._conductor_client = None self._status_client = None self.conductor_topic_handler = None self.status_topic_handler = None self.notification_handler = None self.heartbeat = None if self.conductor_topic and self.conductor_endpoints: self.conductor_topic_handler = self.build_topic_handler( self.conductor_topic, self.conductor_endpoints) if self.status_topic and self.status_endpoints: self.status_topic_handler = self.build_topic_handler( self.status_topic, self.status_endpoints) if self.notification_topics and self.notification_endpoints: self.notification_handler = self.build_notification_handler( self.notification_topics, self.notification_endpoints ) self.service_name = self.manager.service_name if self.service_name: self.heartbeat = ServiceHeartbeat( service_name=self.manager.service_name) @property def transport(self): if self._transport is None: self._transport = om.get_transport(CONF) return self._transport @property def notification_transport(self): if self._notification_transport is None: self._notification_transport = om.get_notification_transport(CONF) return self._notification_transport @property def conductor_client(self): if self._conductor_client is None: target = om.Target( topic=self.conductor_topic, version=self.API_VERSION, ) self._conductor_client = om.RPCClient( self.transport, target, serializer=self.serializer) return self._conductor_client @conductor_client.setter def conductor_client(self, c): self.conductor_client = c @property def status_client(self): if self._status_client is None: target = om.Target( topic=self.status_topic, version=self.API_VERSION, ) self._status_client = om.RPCClient( self.transport, target, serializer=self.serializer) return self._status_client @status_client.setter def status_client(self, c): self.status_client = c def build_topic_handler(self, topic_name, endpoints=()): serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) target = om.Target( topic=topic_name, # For compatibility, we can override it with 'host' opt server=CONF.host or socket.getfqdn(), version=self.api_version, ) return om.get_rpc_server( self.transport, target, endpoints, executor='eventlet', serializer=serializer) def build_notification_handler(self, topic_names, endpoints=()): serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer()) targets = [om.Target(topic=topic_name) for topic_name in topic_names] return om.get_notification_listener( self.notification_transport, targets, endpoints, executor='eventlet', serializer=serializer, allow_requeue=False) def start(self): LOG.debug("Connecting to '%s' (%s)", CONF.transport_url, CONF.rpc_backend) if self.conductor_topic_handler: self.conductor_topic_handler.start() if self.status_topic_handler: self.status_topic_handler.start() if self.notification_handler: self.notification_handler.start() if self.heartbeat: self.heartbeat.start() def stop(self): LOG.debug("Disconnecting from '%s' (%s)", CONF.transport_url, CONF.rpc_backend) if self.conductor_topic_handler: self.conductor_topic_handler.stop() if self.status_topic_handler: self.status_topic_handler.stop() if self.notification_handler: self.notification_handler.stop() if self.heartbeat: self.heartbeat.stop() def reset(self): """Reset a service in case it received a SIGHUP.""" def wait(self): """Wait for service to complete.""" def check_api_version(self, ctx): api_manager_version = self.conductor_client.call( ctx, 'check_api_version', api_version=self.api_version) return api_manager_version def launch(conf, service_, workers=1, restart_method='reload'): return service.launch(conf, service_, workers, restart_method) def prepare_service(argv=(), conf=cfg.CONF): log.register_options(conf) gmr_opts.set_defaults(conf) config.parse_args(argv) cfg.set_defaults(_options.log_opts, default_log_levels=_DEFAULT_LOG_LEVELS) log.setup(conf, 'python-watcher') conf.log_opt_values(LOG, log.DEBUG) objects.register_all() gmr.TextGuruMeditation.register_section(_('Plugins'), opts.show_plugins) gmr.TextGuruMeditation.setup_autorun(version, conf=conf)