From 30bdf29002f2b518701fb28e4234c4cc20e69f84 Mon Sep 17 00:00:00 2001 From: junjie huang Date: Sun, 13 Mar 2016 18:02:27 +0000 Subject: [PATCH] Workload balance migration strategy implementation This is one of the algorithm of Intel thermal POC. It's based on the VM workloads of hypervisors. Change-Id: I45ab0cf0f05786e6f68025bdd315f38381900a68 blueprint: workload-balance-migration-strategy --- setup.cfg | 1 + .../strategy/strategies/workload_balance.py | 324 ++++++++++++++++++ .../strategies/faker_cluster_state.py | 70 ++++ .../strategies/faker_metrics_collector.py | 28 +- .../strategies/test_workload_balance.py | 150 ++++++++ 5 files changed, 572 insertions(+), 1 deletion(-) create mode 100644 watcher/decision_engine/strategy/strategies/workload_balance.py create mode 100644 watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py diff --git a/setup.cfg b/setup.cfg index 24ed95f20..ac5d502e3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,7 @@ watcher_strategies = outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation workload_stabilization = watcher.decision_engine.strategy.strategies.workload_stabilization:WorkloadStabilization + workload_balance = watcher.decision_engine.strategy.strategies.workload_balance:WorkloadBalance watcher_actions = migrate = watcher.applier.actions.migration:Migrate diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py new file mode 100644 index 000000000..15bbc72b4 --- /dev/null +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -0,0 +1,324 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel Corp +# +# Authors: Junjie-Huang +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from oslo_log import log + +from watcher._i18n import _LE, _LI, _LW +from watcher.common import exception as wexc +from watcher.decision_engine.model import resource +from watcher.decision_engine.model import vm_state +from watcher.decision_engine.strategy.strategies import base +from watcher.metrics_engine.cluster_history import ceilometer as ceil + +LOG = log.getLogger(__name__) + + +class WorkloadBalance(base.BaseStrategy): + """[PoC]Workload balance using live migration + + *Description* + + It is a migration strategy based on the VM workload of physical + servers. It generates solutions to move a workload whenever a server's + CPU utilization % is higher than the specified threshold. + The VM to be moved should make the host close to average workload + of all hypervisors. + + *Requirements* + + * Hardware: compute node should use the same physical CPUs + * Software: Ceilometer component ceilometer-agent-compute running + in each compute node, and Ceilometer API can report such telemetry + "cpu_util" successfully. + * You must have at least 2 physical compute nodes to run this strategy + + *Limitations* + + - This is a proof of concept that is not meant to be used in production + - We cannot forecast how many servers should be migrated. This is the + reason why we only plan a single virtual machine migration at a time. + So it's better to use this algorithm with `CONTINUOUS` audits. + - It assume that live migrations are possible + + """ + + # The meter to report CPU utilization % of VM in ceilometer + METER_NAME = "cpu_util" + # Unit: %, value range is [0 , 100] + # TODO(Junjie): make it configurable + THRESHOLD = 25.0 + # choose 300 seconds as the default duration of meter aggregation + # TODO(Junjie): make it configurable + PERIOD = 300 + + MIGRATION = "migrate" + + def __init__(self, osc=None): + """Using live migration + + :param osc: an OpenStackClients object + """ + super(WorkloadBalance, self).__init__(osc) + # the migration plan will be triggered when the CPU utlization % + # reaches threshold + # TODO(Junjie): Threshold should be configurable for each audit + self.threshold = self.THRESHOLD + self._meter = self.METER_NAME + self._ceilometer = None + self._period = self.PERIOD + + @property + def ceilometer(self): + if self._ceilometer is None: + self._ceilometer = ceil.CeilometerClusterHistory(osc=self.osc) + return self._ceilometer + + @ceilometer.setter + def ceilometer(self, c): + self._ceilometer = c + + @classmethod + def get_name(cls): + return "workload_balance" + + @classmethod + def get_display_name(cls): + return _("workload balance migration strategy") + + @classmethod + def get_translatable_display_name(cls): + return "workload balance migration strategy" + + @classmethod + def get_goal_name(cls): + return "WORKLOAD_OPTIMIZATION" + + @classmethod + def get_goal_display_name(cls): + return _("Workload optimization") + + @classmethod + def get_translatable_goal_display_name(cls): + return "Workload optimization" + + def calculate_used_resource(self, model, hypervisor, cap_cores, cap_mem, + cap_disk): + '''calculate the used vcpus, memory and disk based on VM flavors''' + vms = model.get_mapping().get_node_vms(hypervisor) + vcpus_used = 0 + memory_mb_used = 0 + disk_gb_used = 0 + for vm_id in vms: + vm = model.get_vm_from_id(vm_id) + vcpus_used += cap_cores.get_capacity(vm) + memory_mb_used += cap_mem.get_capacity(vm) + disk_gb_used += cap_disk.get_capacity(vm) + + return vcpus_used, memory_mb_used, disk_gb_used + + def choose_vm_to_migrate(self, model, hosts, avg_workload, workload_cache): + """pick up an active vm instance to migrate from provided hosts + + :param model: it's the origin_model passed from 'execute' function + :param hosts: the array of dict which contains hypervisor object + :param avg_workload: the average workload value of all hypervisors + :param workload_cache: the map contains vm to workload mapping + """ + for hvmap in hosts: + source_hypervisor = hvmap['hv'] + source_vms = model.get_mapping().get_node_vms(source_hypervisor) + if source_vms: + delta_workload = hvmap['workload'] - avg_workload + min_delta = 1000000 + instance_id = None + for vm_id in source_vms: + try: + # select the first active VM to migrate + vm = model.get_vm_from_id(vm_id) + if vm.state != vm_state.VMState.ACTIVE.value: + LOG.debug("VM not active, skipped: %s", + vm.uuid) + continue + current_delta = delta_workload - workload_cache[vm_id] + if 0 <= current_delta < min_delta: + min_delta = current_delta + instance_id = vm_id + except wexc.InstanceNotFound: + LOG.error(_LE("VM not found Error: %s"), vm_id) + if instance_id: + return source_hypervisor, model.get_vm_from_id(instance_id) + else: + LOG.info(_LI("VM not found from hypervisor: %s"), + source_hypervisor.uuid) + + def filter_destination_hosts(self, model, hosts, vm_to_migrate, + avg_workload, workload_cache): + '''Only return hosts with sufficient available resources''' + + cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) + cap_disk = model.get_resource_from_id(resource.ResourceType.disk) + cap_mem = model.get_resource_from_id(resource.ResourceType.memory) + + required_cores = cap_cores.get_capacity(vm_to_migrate) + required_disk = cap_disk.get_capacity(vm_to_migrate) + required_mem = cap_mem.get_capacity(vm_to_migrate) + + # filter hypervisors without enough resource + destination_hosts = [] + src_vm_workload = workload_cache[vm_to_migrate.uuid] + for hvmap in hosts: + host = hvmap['hv'] + workload = hvmap['workload'] + # calculate the available resources + cores_used, mem_used, disk_used = self.calculate_used_resource( + model, host, cap_cores, cap_mem, cap_disk) + cores_available = cap_cores.get_capacity(host) - cores_used + disk_available = cap_disk.get_capacity(host) - disk_used + mem_available = cap_mem.get_capacity(host) - mem_used + if (cores_available >= required_cores and + disk_available >= required_disk and + mem_available >= required_mem and + (src_vm_workload + workload) < self.threshold / 100 * + cap_cores.get_capacity(host)): + destination_hosts.append(hvmap) + + return destination_hosts + + def group_hosts_by_cpu_util(self, model): + """Calculate the workloads of each hypervisor + + try to find out the hypervisors which have reached threshold + and the hypervisors which are under threshold. + and also calculate the average workload value of all hypervisors. + and also generate the VM workload map. + """ + + hypervisors = model.get_all_hypervisors() + cluster_size = len(hypervisors) + if not hypervisors: + raise wexc.ClusterEmpty() + # get cpu cores capacity of hypervisors and vms + cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) + overload_hosts = [] + nonoverload_hosts = [] + # total workload of cluster + # it's the total core numbers being utilized in a cluster. + cluster_workload = 0.0 + # use workload_cache to store the workload of VMs for reuse purpose + workload_cache = {} + for hypervisor_id in hypervisors: + hypervisor = model.get_hypervisor_from_id(hypervisor_id) + vms = model.get_mapping().get_node_vms(hypervisor) + hypervisor_workload = 0.0 + for vm_id in vms: + vm = model.get_vm_from_id(vm_id) + try: + cpu_util = self.ceilometer.statistic_aggregation( + resource_id=vm_id, + meter_name=self._meter, + period=self._period, + aggregate='avg') + except Exception as e: + LOG.error(_LE("Can not get cpu_util: %s"), e.message) + continue + if cpu_util is None: + LOG.debug("%s: cpu_util is None", vm_id) + continue + vm_cores = cap_cores.get_capacity(vm) + workload_cache[vm_id] = cpu_util * vm_cores / 100 + hypervisor_workload += workload_cache[vm_id] + LOG.debug("%s: cpu_util %f", vm_id, cpu_util) + hypervisor_cores = cap_cores.get_capacity(hypervisor) + hy_cpu_util = hypervisor_workload / hypervisor_cores * 100 + + cluster_workload += hypervisor_workload + + hvmap = {'hv': hypervisor, "cpu_util": hy_cpu_util, 'workload': + hypervisor_workload} + if hy_cpu_util >= self.threshold: + # mark the hypervisor to release resources + overload_hosts.append(hvmap) + else: + nonoverload_hosts.append(hvmap) + + avg_workload = cluster_workload / cluster_size + + return overload_hosts, nonoverload_hosts, avg_workload, workload_cache + + def execute(self, origin_model): + LOG.info(_LI("Initializing Workload Balance Strategy")) + + if origin_model is None: + raise wexc.ClusterStateNotDefined() + + current_model = origin_model + src_hypervisors, target_hypervisors, avg_workload, workload_cache = ( + self.group_hosts_by_cpu_util(current_model)) + + if not src_hypervisors: + LOG.debug("No hosts require optimization") + return self.solution + + if not target_hypervisors: + LOG.warning(_LW("No hosts current have CPU utilization under %s " + "percent, therefore there are no possible target " + "hosts for any migration"), + self.threshold) + return self.solution + + # choose the server with largest cpu_util + src_hypervisors = sorted(src_hypervisors, + reverse=True, + key=lambda x: (x[self.METER_NAME])) + + vm_to_migrate = self.choose_vm_to_migrate(current_model, + src_hypervisors, + avg_workload, + workload_cache) + if not vm_to_migrate: + return self.solution + source_hypervisor, vm_src = vm_to_migrate + # find the hosts that have enough resource for the VM to be migrated + destination_hosts = self.filter_destination_hosts(current_model, + target_hypervisors, + vm_src, + avg_workload, + workload_cache) + # sort the filtered result by workload + # pick up the lowest one as dest server + if not destination_hosts: + # for instance. + LOG.warning(_LW("No proper target host could be found, it might " + "be because of there's no enough CPU/Memory/DISK")) + return self.solution + destination_hosts = sorted(destination_hosts, + key=lambda x: (x["cpu_util"])) + # always use the host with lowerest CPU utilization + mig_dst_hypervisor = destination_hosts[0]['hv'] + # generate solution to migrate the vm to the dest server, + if current_model.get_mapping().migrate_vm(vm_src, + source_hypervisor, + mig_dst_hypervisor): + parameters = {'migration_type': 'live', + 'src_hypervisor': source_hypervisor.uuid, + 'dst_hypervisor': mig_dst_hypervisor.uuid} + self.solution.add_action(action_type=self.MIGRATION, + resource_id=vm_src.uuid, + input_parameters=parameters) + self.solution.model = current_model + return self.solution diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py index 5514d4f2a..2877c48ff 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py @@ -235,3 +235,73 @@ class FakerModelCollector(base.BaseClusterModelCollector): current_state_cluster.get_vm_from_id("VM_0")) return current_state_cluster + + def generate_scenario_6_with_2_hypervisors(self): + vms = [] + root = modelroot.ModelRoot() + # number of nodes + count_node = 2 + + # define ressouce ( CPU, MEM disk, ... ) + mem = resource.Resource(resource.ResourceType.memory) + # 2199.954 Mhz + num_cores = resource.Resource(resource.ResourceType.cpu_cores) + disk = resource.Resource(resource.ResourceType.disk) + + root.create_resource(mem) + root.create_resource(num_cores) + root.create_resource(disk) + + for i in range(0, count_node): + node_uuid = "Node_{0}".format(i) + node = hypervisor.Hypervisor() + node.uuid = node_uuid + node.hostname = "hostname_{0}".format(i) + + mem.set_capacity(node, 132) + disk.set_capacity(node, 250) + num_cores.set_capacity(node, 40) + root.add_hypervisor(node) + + vm1 = modelvm.VM() + vm1.uuid = "VM_1" + mem.set_capacity(vm1, 2) + disk.set_capacity(vm1, 20) + num_cores.set_capacity(vm1, 10) + vms.append(vm1) + root.add_vm(vm1) + + vm11 = modelvm.VM() + vm11.uuid = "73b09e16-35b7-4922-804e-e8f5d9b740fc" + mem.set_capacity(vm11, 2) + disk.set_capacity(vm11, 20) + num_cores.set_capacity(vm11, 10) + vms.append(vm11) + root.add_vm(vm11) + + vm2 = modelvm.VM() + vm2.uuid = "VM_3" + mem.set_capacity(vm2, 2) + disk.set_capacity(vm2, 20) + num_cores.set_capacity(vm2, 10) + vms.append(vm2) + root.add_vm(vm2) + + vm21 = modelvm.VM() + vm21.uuid = "VM_4" + mem.set_capacity(vm21, 2) + disk.set_capacity(vm21, 20) + num_cores.set_capacity(vm21, 10) + vms.append(vm21) + root.add_vm(vm21) + + root.get_mapping().map(root.get_hypervisor_from_id("Node_0"), + root.get_vm_from_id(str(vm1.uuid))) + root.get_mapping().map(root.get_hypervisor_from_id("Node_0"), + root.get_vm_from_id(str(vm11.uuid))) + + root.get_mapping().map(root.get_hypervisor_from_id("Node_1"), + root.get_vm_from_id(str(vm2.uuid))) + root.get_mapping().map(root.get_hypervisor_from_id("Node_1"), + root.get_vm_from_id(str(vm21.uuid))) + return root diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py b/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py index 74379cf27..9c50c1169 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py @@ -44,6 +44,13 @@ class FakerMetricsCollector(object): result = self.get_average_outlet_temperature(resource_id) return result + def mock_get_statistics_wb(self, resource_id, meter_name, period, + aggregate='avg'): + result = 0 + if meter_name == "cpu_util": + result = self.get_average_usage_vm_cpu_wb(resource_id) + return result + def get_average_outlet_temperature(self, uuid): """The average outlet temperature for host""" mock = {} @@ -52,7 +59,6 @@ class FakerMetricsCollector(object): mock['Node_1'] = 100 if uuid not in mock.keys(): mock[uuid] = 100 - return mock[str(uuid)] def get_usage_node_ram(self, uuid): @@ -109,6 +115,26 @@ class FakerMetricsCollector(object): return float(mock[str(uuid)]) + def get_average_usage_vm_cpu_wb(self, uuid): + """The last VM CPU usage values to average + + :param uuid:00 + :return: + """ + # query influxdb stream + + # compute in stream + + # Normalize + mock = {} + # node 0 + mock['VM_1'] = 80 + mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 50 + # node 1 + mock['VM_3'] = 20 + mock['VM_4'] = 10 + return float(mock[str(uuid)]) + def get_average_usage_vm_cpu(self, uuid): """The last VM CPU usage values to average diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py new file mode 100644 index 000000000..d361eca03 --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -0,0 +1,150 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel Corp +# +# Authors: Junjie-Huang +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import collections +import mock + +from watcher.applier.actions.loading import default +from watcher.common import exception +from watcher.decision_engine.model import model_root +from watcher.decision_engine.model import resource +from watcher.decision_engine.strategy.strategies import workload_balance +from watcher.tests import base +from watcher.tests.decision_engine.strategy.strategies \ + import faker_cluster_state +from watcher.tests.decision_engine.strategy.strategies \ + import faker_metrics_collector + + +class TestWorkloadBalance(base.BaseTestCase): + # fake metrics + fake_metrics = faker_metrics_collector.FakerMetricsCollector() + + # fake cluster + fake_cluster = faker_cluster_state.FakerModelCollector() + + def test_calc_used_res(self): + model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() + strategy = workload_balance.WorkloadBalance() + hypervisor = model.get_hypervisor_from_id('Node_0') + cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) + cap_mem = model.get_resource_from_id(resource.ResourceType.memory) + cap_disk = model.get_resource_from_id(resource.ResourceType.disk) + cores_used, mem_used, disk_used = strategy.calculate_used_resource( + model, hypervisor, cap_cores, cap_mem, cap_disk) + + self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40)) + + def test_group_hosts_by_cpu_util(self): + model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() + strategy = workload_balance.WorkloadBalance() + strategy.threshold = 30 + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) + # print h1, h2, avg, w_map + self.assertEqual(h1[0]['hv'].uuid, 'Node_0') + self.assertEqual(h2[0]['hv'].uuid, 'Node_1') + self.assertEqual(avg, 8.0) + + def test_choose_vm_to_migrate(self): + model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() + strategy = workload_balance.WorkloadBalance() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) + vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) + self.assertEqual(vm_to_mig[0].uuid, 'Node_0') + self.assertEqual(vm_to_mig[1].uuid, + "73b09e16-35b7-4922-804e-e8f5d9b740fc") + + def test_choose_vm_notfound(self): + model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() + strategy = workload_balance.WorkloadBalance() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) + vms = model.get_all_vms() + vms.clear() + vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) + self.assertEqual(vm_to_mig, None) + + def test_filter_destination_hosts(self): + model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() + strategy = workload_balance.WorkloadBalance() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model) + vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map) + dest_hosts = strategy.filter_destination_hosts(model, h2, vm_to_mig[1], + avg, w_map) + self.assertEqual(len(dest_hosts), 1) + self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_1') + + def test_exception_model(self): + strategy = workload_balance.WorkloadBalance() + self.assertRaises(exception.ClusterStateNotDefined, strategy.execute, + None) + + def test_exception_cluster_empty(self): + strategy = workload_balance.WorkloadBalance() + model = model_root.ModelRoot() + self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + + def test_execute_cluster_empty(self): + strategy = workload_balance.WorkloadBalance() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + model = model_root.ModelRoot() + self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + + def test_execute_no_workload(self): + strategy = workload_balance.WorkloadBalance() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + + current_state_cluster = faker_cluster_state.FakerModelCollector() + model = current_state_cluster. \ + generate_scenario_4_with_1_hypervisor_no_vm() + + solution = strategy.execute(model) + self.assertEqual(solution.actions, []) + + def test_execute(self): + strategy = workload_balance.WorkloadBalance() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() + solution = strategy.execute(model) + actions_counter = collections.Counter( + [action.get('action_type') for action in solution.actions]) + + num_migrations = actions_counter.get("migrate", 0) + self.assertEqual(num_migrations, 1) + + def test_check_parameters(self): + strategy = workload_balance.WorkloadBalance() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_wb) + model = self.fake_cluster.generate_scenario_6_with_2_hypervisors() + solution = strategy.execute(model) + loader = default.DefaultActionLoader() + for action in solution.actions: + loaded_action = loader.load(action['action_type']) + loaded_action.input_parameters = action['input_parameters'] + loaded_action.validate_parameters()