diff --git a/watcher/decision_engine/model/collector/nova.py b/watcher/decision_engine/model/collector/nova.py index 6e6bc2771..5d2baee55 100644 --- a/watcher/decision_engine/model/collector/nova.py +++ b/watcher/decision_engine/model/collector/nova.py @@ -16,6 +16,8 @@ import os_resource_classes as orc from oslo_log import log +from futurist import waiters + from watcher.common import nova_helper from watcher.common import placement_helper from watcher.decision_engine.model.collector import base @@ -23,6 +25,7 @@ from watcher.decision_engine.model import element from watcher.decision_engine.model import model_root from watcher.decision_engine.model.notification import nova from watcher.decision_engine.scope import compute as compute_scope +from watcher.decision_engine import threading LOG = log.getLogger(__name__) @@ -212,8 +215,12 @@ class NovaModelBuilder(base.BaseModelBuilder): self.nova = osc.nova() self.nova_helper = nova_helper.NovaHelper(osc=self.osc) self.placement_helper = placement_helper.PlacementHelper(osc=self.osc) + self.executor = threading.DecisionEngineThreadPool() def _collect_aggregates(self, host_aggregates, _nodes): + if not host_aggregates: + return + aggregate_list = self.call_retry(f=self.nova_helper.get_aggregate_list) aggregate_ids = [aggregate['id'] for aggregate in host_aggregates if 'id' in aggregate] @@ -229,6 +236,9 @@ class NovaModelBuilder(base.BaseModelBuilder): _nodes.update(aggregate.hosts) def _collect_zones(self, availability_zones, _nodes): + if not availability_zones: + return + service_list = self.call_retry(f=self.nova_helper.get_service_list) zone_names = [zone['name'] for zone in availability_zones] @@ -239,20 +249,71 @@ class NovaModelBuilder(base.BaseModelBuilder): if service.zone in zone_names or include_all_nodes: _nodes.add(service.host) - def _add_physical_layer(self): - """Add the physical layer of the graph. + def _compute_node_future(self, future, future_instances): + """Add compute node information to model and schedule instance info job - This includes components which represent actual infrastructure - hardware. + :param future: The future from the finished execution + :rtype future: :py:class:`futurist.GreenFuture` + :param future_instances: list of futures for instance jobs + :rtype future_instances: list :py:class:`futurist.GreenFuture` """ + try: + node_info = future.result()[0] + + # filter out baremetal node + if node_info.hypervisor_type == 'ironic': + LOG.debug("filtering out baremetal node: %s", node_info) + return + self.add_compute_node(node_info) + # node.servers is a list of server objects + # New in nova version 2.53 + instances = getattr(node_info, "servers", None) + # Do not submit job if there are no instances on compute node + if instances is None: + LOG.info("No instances on compute_node: {0}".format(node_info)) + return + future_instances.append( + self.executor.submit( + self.add_instance_node, node_info, instances) + ) + except Exception: + LOG.error("compute node from aggregate / " + "availability_zone could not be found") + + def _add_physical_layer(self): + """Collects all information on compute nodes and instances + + Will collect all required compute node and instance information based + on the host aggregates and availability zones. If aggregates and zones + do not specify any compute nodes all nodes are retrieved instead. + + The collection of information happens concurrently using the + DecisionEngineThreadpool. The collection is parallelized in three steps + first information about aggregates and zones is gathered. Secondly, + for each of the compute nodes a tasks is submitted to get detailed + information about the compute node. Finally, Each of these submitted + tasks will submit an additional task if the compute node contains + instances. Before returning from this function all instance tasks are + waited upon to complete. + """ + compute_nodes = set() host_aggregates = self.model_scope.get("host_aggregates") availability_zones = self.model_scope.get("availability_zones") - if host_aggregates: - self._collect_aggregates(host_aggregates, compute_nodes) - if availability_zones: - self._collect_zones(availability_zones, compute_nodes) + """Submit tasks to gather compute nodes from availability zones and + host aggregates. Each task adds compute nodes to the set, this set is + threadsafe under the assumption that CPython is used with the GIL + enabled.""" + zone_aggregate_futures = { + self.executor.submit( + self._collect_aggregates, host_aggregates, compute_nodes), + self.executor.submit( + self._collect_zones, availability_zones, compute_nodes) + } + waiters.wait_for_all(zone_aggregate_futures) + + # if zones and aggregates did not contain any nodes get every node. if not compute_nodes: self.no_model_scope_flag = True all_nodes = self.call_retry( @@ -260,24 +321,20 @@ class NovaModelBuilder(base.BaseModelBuilder): compute_nodes = set( [node.hypervisor_hostname for node in all_nodes]) LOG.debug("compute nodes: %s", compute_nodes) - for node_name in compute_nodes: - cnode = self.call_retry( - self.nova_helper.get_compute_node_by_name, - node_name, servers=True, detailed=True) - if cnode: - node_info = cnode[0] - # filter out baremetal node - if node_info.hypervisor_type == 'ironic': - LOG.debug("filtering out baremetal node: %s", node_name) - continue - self.add_compute_node(node_info) - # node.servers is a list of server objects - # New in nova version 2.53 - instances = getattr(node_info, "servers", None) - self.add_instance_node(node_info, instances) - else: - LOG.error("compute_node from aggregate / availability_zone " - "could not be found: {0}".format(node_name)) + + node_futures = [self.executor.submit( + self.nova_helper.get_compute_node_by_name, + node, servers=True, detailed=True) + for node in compute_nodes] + LOG.debug("submitted {0} jobs".format(len(compute_nodes))) + + # Futures will concurrently be added, only safe with CPython GIL + future_instances = [] + self.executor.do_while_futures_modify( + node_futures, self._compute_node_future, future_instances) + + # Wait for all instance jobs to finish + waiters.wait_for_all(future_instances) def add_compute_node(self, node): # Build and add base node. diff --git a/watcher/tests/decision_engine/cluster/test_nova_cdmc.py b/watcher/tests/decision_engine/cluster/test_nova_cdmc.py index e6399e071..5e7b78320 100644 --- a/watcher/tests/decision_engine/cluster/test_nova_cdmc.py +++ b/watcher/tests/decision_engine/cluster/test_nova_cdmc.py @@ -291,6 +291,15 @@ class TestNovaModelBuilder(base.TestCase): self.assertEqual(set(['hostone', 'hosttwo']), result) + @mock.patch.object(nova_helper, 'NovaHelper') + def test_collect_aggregates_none(self, m_nova): + """Test collect_aggregates with host_aggregates None""" + result = set() + t_nova_cluster = nova.NovaModelBuilder(mock.Mock()) + t_nova_cluster._collect_aggregates(None, result) + + self.assertEqual(set(), result) + @mock.patch.object(nova_helper, 'NovaHelper') def test_collect_zones(self, m_nova): """""" @@ -310,8 +319,35 @@ class TestNovaModelBuilder(base.TestCase): self.assertEqual(set(['hostone']), result) @mock.patch.object(nova_helper, 'NovaHelper') - def test_add_physical_layer(self, m_nova): - """""" + def test_collect_zones_none(self, m_nova): + """Test collect_zones with availability_zones None""" + result = set() + t_nova_cluster = nova.NovaModelBuilder(mock.Mock()) + t_nova_cluster._collect_zones(None, result) + + self.assertEqual(set(), result) + + @mock.patch.object(placement_helper, 'PlacementHelper') + @mock.patch.object(nova_helper, 'NovaHelper') + def test_add_physical_layer(self, m_nova, m_placement): + """Ensure all three steps of the physical layer are fully executed + + First the return value for get_aggregate_list and get_service_list are + mocked. These return 3 hosts of which hostone is returned by both the + aggregate and service call. This will help verify the elimination of + duplicates. The scope is setup so that only hostone and hosttwo should + remain. + + There will be 2 simulated compute nodes and 2 associated instances. + These will be returned by their matching calls in nova helper. The + calls to get_compute_node_by_name and get_instance_list are asserted + as to verify the correct operation of add_physical_layer. + """ + + mock_placement = mock.Mock(name="placement_helper") + mock_placement.get_inventories.return_value = dict() + mock_placement.get_usages_for_resource_provider.return_value = None + m_placement.return_value = mock_placement m_nova.return_value.get_aggregate_list.return_value = \ [mock.Mock(id=1, name='example'), @@ -321,7 +357,69 @@ class TestNovaModelBuilder(base.TestCase): [mock.Mock(zone='av_b', host='hostthree'), mock.Mock(zone='av_a', host='hostone')] - m_nova.return_value.get_compute_node_by_name.return_value = False + compute_node_one = mock.Mock( + id='796fee99-65dd-4262-aa-fd2a1143faa6', + hypervisor_hostname='hostone', + hypervisor_type='QEMU', + state='TEST_STATE', + status='TEST_STATUS', + memory_mb=333, + memory_mb_used=100, + free_disk_gb=222, + local_gb=111, + local_gb_used=10, + vcpus=4, + vcpus_used=0, + servers=[ + {'name': 'fake_instance', + 'uuid': 'ef500f7e-dac8-470f-960c-169486fce71b'} + ], + service={'id': 123, 'host': 'hostone', + 'disabled_reason': ''}, + ) + + compute_node_two = mock.Mock( + id='756fef99-65dd-4262-aa-fd2a1143faa6', + hypervisor_hostname='hosttwo', + hypervisor_type='QEMU', + state='TEST_STATE', + status='TEST_STATUS', + memory_mb=333, + memory_mb_used=100, + free_disk_gb=222, + local_gb=111, + local_gb_used=10, + vcpus=4, + vcpus_used=0, + servers=[ + {'name': 'fake_instance2', + 'uuid': 'ef500f7e-dac8-47f0-960c-169486fce71b'} + ], + service={'id': 123, 'host': 'hosttwo', + 'disabled_reason': ''}, + ) + + m_nova.return_value.get_compute_node_by_name.side_effect = [ + [compute_node_one], [compute_node_two] + ] + + fake_instance_one = mock.Mock( + id='796fee99-65dd-4262-aa-fd2a1143faa6', + name='fake_instance', + flavor={'ram': 333, 'disk': 222, 'vcpus': 4, 'id': 1}, + metadata={'hi': 'hello'}, + tenant_id='ff560f7e-dbc8-771f-960c-164482fce21b', + ) + fake_instance_two = mock.Mock( + id='ef500f7e-dac8-47f0-960c-169486fce71b', + name='fake_instance2', + flavor={'ram': 333, 'disk': 222, 'vcpus': 4, 'id': 1}, + metadata={'hi': 'hello'}, + tenant_id='756fef99-65dd-4262-aa-fd2a1143faa6', + ) + m_nova.return_value.get_instance_list.side_effect = [ + [fake_instance_one], [fake_instance_two] + ] m_scope = [{"compute": [ {"host_aggregates": [{"id": 5}]}, @@ -337,6 +435,13 @@ class TestNovaModelBuilder(base.TestCase): self.assertEqual( m_nova.return_value.get_compute_node_by_name.call_count, 2) + m_nova.return_value.get_instance_list.assert_any_call( + filters={'host': 'hostone'}, limit=1) + m_nova.return_value.get_instance_list.assert_any_call( + filters={'host': 'hosttwo'}, limit=1) + self.assertEqual( + m_nova.return_value.get_instance_list.call_count, 2) + @mock.patch.object(placement_helper, 'PlacementHelper') @mock.patch.object(nova_helper, 'NovaHelper') def test_add_physical_layer_with_baremetal_node(self, m_nova,