Files
watcher/watcher/common/service.py
Matt Riedemann 5c2939f23b Remove watcher.openstack.common=WARN from _DEFAULT_LOG_LEVELS
Hard-coding watcher.openstack.common to warning level logging
only makes it hard to debug watcher's interactions with other
services, like when it's triggering and monitoring a server live
migration.

Since debug logging is controlled via the "debug" configuration
option, we can just rely on that to filter out debug logs within
watcher itself.

Note this has been this way since change
I699e0ab082657880998d8618fe29eb7f56c6c661 back in 2015 and there
was no explanation why the watcher.openstack.common logging
was set to WARN level.

Change-Id: I939403a4ae36d1aa9ea83badb9404bc37d18a1a6
Related-Bug: #1828598
2019-05-10 11:29:08 -04:00

308 lines
10 KiB
Python

# -*- encoding: utf-8 -*-
#
# Copyright © 2012 eNovance <licensing@enovance.com>
##
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import socket
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import _options
from oslo_log import log
import oslo_messaging as om
from oslo_reports import guru_meditation_report as gmr
from oslo_reports import opts as gmr_opts
from oslo_service import service
from oslo_service import wsgi
from oslo_messaging.rpc import dispatcher
from watcher._i18n import _
from watcher.api import app
from watcher.common import config
from watcher.common import context
from watcher.common import rpc
from watcher.common import scheduling
from watcher.conf import plugins as plugins_conf
from watcher import objects
from watcher.objects import base
from watcher.objects import fields as wfields
from watcher import version
NOTIFICATION_OPTS = [
cfg.StrOpt('notification_level',
choices=[''] + list(wfields.NotificationPriority.ALL),
default=wfields.NotificationPriority.INFO,
help=_('Specifies the minimum level for which to send '
'notifications. If not set, no notifications will '
'be sent. The default is for this option to be at the '
'`INFO` level.'))
]
cfg.CONF.register_opts(NOTIFICATION_OPTS)
CONF = cfg.CONF
LOG = log.getLogger(__name__)
_DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO',
'oslo.messaging=INFO', 'sqlalchemy=WARN',
'keystoneclient=INFO', 'stevedore=INFO',
'eventlet.wsgi.server=WARN', 'iso8601=WARN',
'requests=WARN', 'neutronclient=WARN',
'glanceclient=WARN',
'apscheduler=WARN']
Singleton = service.Singleton
class WSGIService(service.ServiceBase):
"""Provides ability to launch Watcher API from wsgi app."""
def __init__(self, service_name, use_ssl=False):
"""Initialize, but do not start the WSGI server.
:param service_name: The service name of the WSGI server.
:param use_ssl: Wraps the socket in an SSL context if True.
"""
self.service_name = service_name
self.app = app.VersionSelectorApplication()
self.workers = (CONF.api.workers or
processutils.get_worker_count())
self.server = wsgi.Server(CONF, self.service_name, self.app,
host=CONF.api.host,
port=CONF.api.port,
use_ssl=use_ssl,
logger_name=self.service_name)
def start(self):
"""Start serving this service using loaded configuration"""
self.server.start()
def stop(self):
"""Stop serving this API"""
self.server.stop()
def wait(self):
"""Wait for the service to stop serving this API"""
self.server.wait()
def reset(self):
"""Reset server greenpool size to default"""
self.server.reset()
class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
service_name = None
def __init__(self, gconfig=None, service_name=None, **kwargs):
gconfig = None or {}
super(ServiceHeartbeat, self).__init__(gconfig, **kwargs)
ServiceHeartbeat.service_name = service_name
self.context = context.make_context()
self.send_beat()
def send_beat(self):
host = CONF.host
watcher_list = objects.Service.list(
self.context, filters={'name': ServiceHeartbeat.service_name,
'host': host})
if watcher_list:
watcher_service = watcher_list[0]
watcher_service.last_seen_up = datetime.datetime.utcnow()
watcher_service.save()
else:
watcher_service = objects.Service(self.context)
watcher_service.name = ServiceHeartbeat.service_name
watcher_service.host = host
watcher_service.create()
def add_heartbeat_job(self):
self.add_job(self.send_beat, 'interval', seconds=60,
next_run_time=datetime.datetime.now())
@classmethod
def get_service_name(cls):
return CONF.host, cls.service_name
def start(self):
"""Start service."""
self.add_heartbeat_job()
super(ServiceHeartbeat, self).start()
def stop(self):
"""Stop service."""
self.shutdown()
def wait(self):
"""Wait for service to complete."""
def reset(self):
"""Reset service.
Called in case service running in daemon mode receives SIGHUP.
"""
class Service(service.ServiceBase):
API_VERSION = '1.0'
def __init__(self, manager_class):
super(Service, self).__init__()
self.manager = manager_class()
self.publisher_id = self.manager.publisher_id
self.api_version = self.manager.api_version
self.conductor_topic = self.manager.conductor_topic
self.notification_topics = self.manager.notification_topics
self.heartbeat = None
self.service_name = self.manager.service_name
if self.service_name:
self.heartbeat = ServiceHeartbeat(
service_name=self.manager.service_name)
self.conductor_endpoints = [
ep(self) for ep in self.manager.conductor_endpoints
]
self.notification_endpoints = self.manager.notification_endpoints
self.serializer = rpc.RequestContextSerializer(
base.WatcherObjectSerializer())
self._transport = None
self._notification_transport = None
self._conductor_client = None
self.conductor_topic_handler = None
self.notification_handler = None
if self.conductor_topic and self.conductor_endpoints:
self.conductor_topic_handler = self.build_topic_handler(
self.conductor_topic, self.conductor_endpoints)
if self.notification_topics and self.notification_endpoints:
self.notification_handler = self.build_notification_handler(
self.notification_topics, self.notification_endpoints
)
@property
def transport(self):
if self._transport is None:
self._transport = om.get_rpc_transport(CONF)
return self._transport
@property
def notification_transport(self):
if self._notification_transport is None:
self._notification_transport = om.get_notification_transport(CONF)
return self._notification_transport
@property
def conductor_client(self):
if self._conductor_client is None:
target = om.Target(
topic=self.conductor_topic,
version=self.API_VERSION,
)
self._conductor_client = om.RPCClient(
self.transport, target, serializer=self.serializer)
return self._conductor_client
@conductor_client.setter
def conductor_client(self, c):
self.conductor_client = c
def build_topic_handler(self, topic_name, endpoints=()):
access_policy = dispatcher.DefaultRPCAccessPolicy
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
target = om.Target(
topic=topic_name,
# For compatibility, we can override it with 'host' opt
server=CONF.host or socket.gethostname(),
version=self.api_version,
)
return om.get_rpc_server(
self.transport, target, endpoints,
executor='eventlet', serializer=serializer,
access_policy=access_policy)
def build_notification_handler(self, topic_names, endpoints=()):
serializer = rpc.RequestContextSerializer(rpc.JsonPayloadSerializer())
targets = []
for topic in topic_names:
kwargs = {}
if '.' in topic:
exchange, topic = topic.split('.')
kwargs['exchange'] = exchange
kwargs['topic'] = topic
targets.append(om.Target(**kwargs))
return om.get_notification_listener(
self.notification_transport, targets, endpoints,
executor='eventlet', serializer=serializer,
allow_requeue=False, pool=CONF.host)
def start(self):
LOG.debug("Connecting to '%s'", CONF.transport_url)
if self.conductor_topic_handler:
self.conductor_topic_handler.start()
if self.notification_handler:
self.notification_handler.start()
if self.heartbeat:
self.heartbeat.start()
def stop(self):
LOG.debug("Disconnecting from '%s'", CONF.transport_url)
if self.conductor_topic_handler:
self.conductor_topic_handler.stop()
if self.notification_handler:
self.notification_handler.stop()
if self.heartbeat:
self.heartbeat.stop()
def reset(self):
"""Reset a service in case it received a SIGHUP."""
def wait(self):
"""Wait for service to complete."""
def check_api_version(self, ctx):
api_manager_version = self.conductor_client.call(
ctx, 'check_api_version', api_version=self.api_version)
return api_manager_version
def launch(conf, service_, workers=1, restart_method='mutate'):
return service.launch(conf, service_, workers, restart_method)
def prepare_service(argv=(), conf=cfg.CONF):
log.register_options(conf)
gmr_opts.set_defaults(conf)
config.parse_args(argv)
cfg.set_defaults(_options.log_opts,
default_log_levels=_DEFAULT_LOG_LEVELS)
log.setup(conf, 'python-watcher')
conf.log_opt_values(LOG, log.DEBUG)
objects.register_all()
gmr.TextGuruMeditation.register_section(
_('Plugins'), plugins_conf.show_plugins)
gmr.TextGuruMeditation.setup_autorun(version, conf=conf)