Merge "Use threadpool when building compute data model"
This commit is contained in:
@@ -16,6 +16,8 @@
|
||||
import os_resource_classes as orc
|
||||
from oslo_log import log
|
||||
|
||||
from futurist import waiters
|
||||
|
||||
from watcher.common import nova_helper
|
||||
from watcher.common import placement_helper
|
||||
from watcher.decision_engine.model.collector import base
|
||||
@@ -23,6 +25,7 @@ from watcher.decision_engine.model import element
|
||||
from watcher.decision_engine.model import model_root
|
||||
from watcher.decision_engine.model.notification import nova
|
||||
from watcher.decision_engine.scope import compute as compute_scope
|
||||
from watcher.decision_engine import threading
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@@ -212,8 +215,12 @@ class NovaModelBuilder(base.BaseModelBuilder):
|
||||
self.nova = osc.nova()
|
||||
self.nova_helper = nova_helper.NovaHelper(osc=self.osc)
|
||||
self.placement_helper = placement_helper.PlacementHelper(osc=self.osc)
|
||||
self.executor = threading.DecisionEngineThreadPool()
|
||||
|
||||
def _collect_aggregates(self, host_aggregates, _nodes):
|
||||
if not host_aggregates:
|
||||
return
|
||||
|
||||
aggregate_list = self.call_retry(f=self.nova_helper.get_aggregate_list)
|
||||
aggregate_ids = [aggregate['id'] for aggregate
|
||||
in host_aggregates if 'id' in aggregate]
|
||||
@@ -229,6 +236,9 @@ class NovaModelBuilder(base.BaseModelBuilder):
|
||||
_nodes.update(aggregate.hosts)
|
||||
|
||||
def _collect_zones(self, availability_zones, _nodes):
|
||||
if not availability_zones:
|
||||
return
|
||||
|
||||
service_list = self.call_retry(f=self.nova_helper.get_service_list)
|
||||
zone_names = [zone['name'] for zone
|
||||
in availability_zones]
|
||||
@@ -239,20 +249,71 @@ class NovaModelBuilder(base.BaseModelBuilder):
|
||||
if service.zone in zone_names or include_all_nodes:
|
||||
_nodes.add(service.host)
|
||||
|
||||
def _add_physical_layer(self):
|
||||
"""Add the physical layer of the graph.
|
||||
def _compute_node_future(self, future, future_instances):
|
||||
"""Add compute node information to model and schedule instance info job
|
||||
|
||||
This includes components which represent actual infrastructure
|
||||
hardware.
|
||||
:param future: The future from the finished execution
|
||||
:rtype future: :py:class:`futurist.GreenFuture`
|
||||
:param future_instances: list of futures for instance jobs
|
||||
:rtype future_instances: list :py:class:`futurist.GreenFuture`
|
||||
"""
|
||||
try:
|
||||
node_info = future.result()[0]
|
||||
|
||||
# filter out baremetal node
|
||||
if node_info.hypervisor_type == 'ironic':
|
||||
LOG.debug("filtering out baremetal node: %s", node_info)
|
||||
return
|
||||
self.add_compute_node(node_info)
|
||||
# node.servers is a list of server objects
|
||||
# New in nova version 2.53
|
||||
instances = getattr(node_info, "servers", None)
|
||||
# Do not submit job if there are no instances on compute node
|
||||
if instances is None:
|
||||
LOG.info("No instances on compute_node: {0}".format(node_info))
|
||||
return
|
||||
future_instances.append(
|
||||
self.executor.submit(
|
||||
self.add_instance_node, node_info, instances)
|
||||
)
|
||||
except Exception:
|
||||
LOG.error("compute node from aggregate / "
|
||||
"availability_zone could not be found")
|
||||
|
||||
def _add_physical_layer(self):
|
||||
"""Collects all information on compute nodes and instances
|
||||
|
||||
Will collect all required compute node and instance information based
|
||||
on the host aggregates and availability zones. If aggregates and zones
|
||||
do not specify any compute nodes all nodes are retrieved instead.
|
||||
|
||||
The collection of information happens concurrently using the
|
||||
DecisionEngineThreadpool. The collection is parallelized in three steps
|
||||
first information about aggregates and zones is gathered. Secondly,
|
||||
for each of the compute nodes a tasks is submitted to get detailed
|
||||
information about the compute node. Finally, Each of these submitted
|
||||
tasks will submit an additional task if the compute node contains
|
||||
instances. Before returning from this function all instance tasks are
|
||||
waited upon to complete.
|
||||
"""
|
||||
|
||||
compute_nodes = set()
|
||||
host_aggregates = self.model_scope.get("host_aggregates")
|
||||
availability_zones = self.model_scope.get("availability_zones")
|
||||
if host_aggregates:
|
||||
self._collect_aggregates(host_aggregates, compute_nodes)
|
||||
if availability_zones:
|
||||
self._collect_zones(availability_zones, compute_nodes)
|
||||
|
||||
"""Submit tasks to gather compute nodes from availability zones and
|
||||
host aggregates. Each task adds compute nodes to the set, this set is
|
||||
threadsafe under the assumption that CPython is used with the GIL
|
||||
enabled."""
|
||||
zone_aggregate_futures = {
|
||||
self.executor.submit(
|
||||
self._collect_aggregates, host_aggregates, compute_nodes),
|
||||
self.executor.submit(
|
||||
self._collect_zones, availability_zones, compute_nodes)
|
||||
}
|
||||
waiters.wait_for_all(zone_aggregate_futures)
|
||||
|
||||
# if zones and aggregates did not contain any nodes get every node.
|
||||
if not compute_nodes:
|
||||
self.no_model_scope_flag = True
|
||||
all_nodes = self.call_retry(
|
||||
@@ -260,24 +321,20 @@ class NovaModelBuilder(base.BaseModelBuilder):
|
||||
compute_nodes = set(
|
||||
[node.hypervisor_hostname for node in all_nodes])
|
||||
LOG.debug("compute nodes: %s", compute_nodes)
|
||||
for node_name in compute_nodes:
|
||||
cnode = self.call_retry(
|
||||
self.nova_helper.get_compute_node_by_name,
|
||||
node_name, servers=True, detailed=True)
|
||||
if cnode:
|
||||
node_info = cnode[0]
|
||||
# filter out baremetal node
|
||||
if node_info.hypervisor_type == 'ironic':
|
||||
LOG.debug("filtering out baremetal node: %s", node_name)
|
||||
continue
|
||||
self.add_compute_node(node_info)
|
||||
# node.servers is a list of server objects
|
||||
# New in nova version 2.53
|
||||
instances = getattr(node_info, "servers", None)
|
||||
self.add_instance_node(node_info, instances)
|
||||
else:
|
||||
LOG.error("compute_node from aggregate / availability_zone "
|
||||
"could not be found: {0}".format(node_name))
|
||||
|
||||
node_futures = [self.executor.submit(
|
||||
self.nova_helper.get_compute_node_by_name,
|
||||
node, servers=True, detailed=True)
|
||||
for node in compute_nodes]
|
||||
LOG.debug("submitted {0} jobs".format(len(compute_nodes)))
|
||||
|
||||
# Futures will concurrently be added, only safe with CPython GIL
|
||||
future_instances = []
|
||||
self.executor.do_while_futures_modify(
|
||||
node_futures, self._compute_node_future, future_instances)
|
||||
|
||||
# Wait for all instance jobs to finish
|
||||
waiters.wait_for_all(future_instances)
|
||||
|
||||
def add_compute_node(self, node):
|
||||
# Build and add base node.
|
||||
|
||||
Reference in New Issue
Block a user