Files
watcher/watcher/common/service.py
Vincent Françoise cdda06c08c Removed status_topic config parameter
In this changeset, I removed the now obsolete status_topic config
option.

DocImpact
Partially Implements: blueprint watcher-notifications-ovo

Change-Id: Icfc03abd875b77fc456bfa286ac2b5774651e8fa
2016-11-16 13:57:25 +01:00

301 lines
10 KiB
Python

# -*- encoding: utf-8 -*-
#
# Copyright © 2012 eNovance <licensing@enovance.com>
##
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import socket
import eventlet
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import _options
from oslo_log import log
import oslo_messaging as om
from oslo_reports import guru_meditation_report as gmr
from oslo_reports import opts as gmr_opts
from oslo_service import service
from oslo_service import wsgi
from watcher._i18n import _
from watcher.api import app
from watcher.common import config
from watcher.common import context
from watcher.common import rpc
from watcher.common import scheduling
from watcher import objects
from watcher.objects import base
from watcher import opts
from watcher import version
# NOTE:
# Ubuntu 14.04 forces librabbitmq when kombu is used
# Unfortunately it forces a version that has a crash
# bug. Calling eventlet.monkey_patch() tells kombu
# to use libamqp instead.
eventlet.monkey_patch()
service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
help=_('Seconds between running periodic tasks.')),
cfg.StrOpt('host',
default=socket.getfqdn(),
help=_('Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.')),
cfg.IntOpt('service_down_time',
default=90,
help=_('Maximum time since last check-in for up service.'))
]
cfg.CONF.register_opts(service_opts)
CONF = cfg.CONF
LOG = log.getLogger(__name__)
_DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO',
'oslo.messaging=INFO', 'sqlalchemy=WARN',
'keystoneclient=INFO', 'stevedore=INFO',
'eventlet.wsgi.server=WARN', 'iso8601=WARN',
'paramiko=WARN', 'requests=WARN', 'neutronclient=WARN',
'glanceclient=WARN', 'watcher.openstack.common=WARN']
Singleton = service.Singleton
class WSGIService(service.ServiceBase):
"""Provides ability to launch Watcher API from wsgi app."""
def __init__(self, service_name, use_ssl=False):
"""Initialize, but do not start the WSGI server.
:param service_name: The service name of the WSGI server.
:param use_ssl: Wraps the socket in an SSL context if True.
"""
self.service_name = service_name
self.app = app.VersionSelectorApplication()
self.workers = (CONF.api.workers or
processutils.get_worker_count())
self.server = wsgi.Server(CONF, self.service_name, self.app,
host=CONF.api.host,
port=CONF.api.port,
use_ssl=use_ssl,
logger_name=self.service_name)
def start(self):
"""Start serving this service using loaded configuration"""
self.server.start()
def stop(self):
"""Stop serving this API"""
self.server.stop()
def wait(self):
"""Wait for the service to stop serving this API"""
self.server.wait()
def reset(self):
"""Reset server greenpool size to default"""
self.server.reset()
class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
def __init__(self, gconfig=None, service_name=None, **kwargs):
gconfig = None or {}
super(ServiceHeartbeat, self).__init__(gconfig, **kwargs)
self.service_name = service_name
self.context = context.make_context()
def send_beat(self):
host = CONF.host
watcher_list = objects.Service.list(
self.context, filters={'name': self.service_name,
'host': host})
if watcher_list:
watcher_service = watcher_list[0]
watcher_service.last_seen_up = datetime.datetime.utcnow()
watcher_service.save()
else:
watcher_service = objects.Service(self.context)
watcher_service.name = self.service_name
watcher_service.host = host
watcher_service.create()
def add_heartbeat_job(self):
self.add_job(self.send_beat, 'interval', seconds=60,
next_run_time=datetime.datetime.now())
def start(self):
"""Start service."""
self.add_heartbeat_job()
super(ServiceHeartbeat, self).start()
def stop(self):
"""Stop service."""
self.shutdown()
def wait(self):
"""Wait for service to complete."""
def reset(self):
"""Reset service.
Called in case service running in daemon mode receives SIGHUP.
"""
class Service(service.ServiceBase):
API_VERSION = '1.0'
def __init__(self, manager_class):
super(Service, self).__init__()
self.manager = manager_class()
self.publisher_id = self.manager.publisher_id
self.api_version = self.manager.api_version
self.conductor_topic = self.manager.conductor_topic
self.notification_topics = self.manager.notification_topics
self.conductor_endpoints = [
ep(self) for ep in self.manager.conductor_endpoints
]
self.notification_endpoints = self.manager.notification_endpoints
self.serializer = rpc.RequestContextSerializer(
base.WatcherObjectSerializer())
self._transport = None
self._notification_transport = None
self._conductor_client = None
self.conductor_topic_handler = None
self.notification_handler = None
self.heartbeat = None
if self.conductor_topic and self.conductor_endpoints:
self.conductor_topic_handler = self.build_topic_handler(
self.conductor_topic, self.conductor_endpoints)
if self.notification_topics and self.notification_endpoints:
self.notification_handler = self.build_notification_handler(
self.notification_topics, self.notification_endpoints
)
self.service_name = self.manager.service_name
if self.service_name:
self.heartbeat = ServiceHeartbeat(
service_name=self.manager.service_name)
@property
def transport(self):
if self._transport is None:
self._transport = om.get_transport(CONF)
return self._transport
@property
def notification_transport(self):
if self._notification_transport is None:
self._notification_transport = om.get_notification_transport(CONF)
return self._notification_transport
@property
def conductor_client(self):
if self._conductor_client is None:
target = om.Target(
topic=self.conductor_topic,
version=self.API_VERSION,
)
self._conductor_client = om.RPCClient(
self.transport, target, serializer=self.serializer)
return self._conductor_client
@conductor_client.setter
def conductor_client(self, c):
self.conductor_client = c
def build_topic_handler(self, topic_name, endpoints=()):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
target = om.Target(
topic=topic_name,
# For compatibility, we can override it with 'host' opt
server=CONF.host or socket.getfqdn(),
version=self.api_version,
)
return om.get_rpc_server(
self.transport, target, endpoints,
executor='eventlet', serializer=serializer)
def build_notification_handler(self, topic_names, endpoints=()):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
targets = [om.Target(topic=topic_name) for topic_name in topic_names]
return om.get_notification_listener(
self.notification_transport, targets, endpoints,
executor='eventlet', serializer=serializer,
allow_requeue=False)
def start(self):
LOG.debug("Connecting to '%s' (%s)",
CONF.transport_url, CONF.rpc_backend)
if self.conductor_topic_handler:
self.conductor_topic_handler.start()
if self.notification_handler:
self.notification_handler.start()
if self.heartbeat:
self.heartbeat.start()
def stop(self):
LOG.debug("Disconnecting from '%s' (%s)",
CONF.transport_url, CONF.rpc_backend)
if self.conductor_topic_handler:
self.conductor_topic_handler.stop()
if self.notification_handler:
self.notification_handler.stop()
if self.heartbeat:
self.heartbeat.stop()
def reset(self):
"""Reset a service in case it received a SIGHUP."""
def wait(self):
"""Wait for service to complete."""
def check_api_version(self, ctx):
api_manager_version = self.conductor_client.call(
ctx, 'check_api_version', api_version=self.api_version)
return api_manager_version
def launch(conf, service_, workers=1, restart_method='reload'):
return service.launch(conf, service_, workers, restart_method)
def prepare_service(argv=(), conf=cfg.CONF):
log.register_options(conf)
gmr_opts.set_defaults(conf)
config.parse_args(argv)
cfg.set_defaults(_options.log_opts,
default_log_levels=_DEFAULT_LOG_LEVELS)
log.setup(conf, 'python-watcher')
conf.log_opt_values(LOG, log.DEBUG)
objects.register_all()
gmr.TextGuruMeditation.register_section(_('Plugins'), opts.show_plugins)
gmr.TextGuruMeditation.setup_autorun(version, conf=conf)