Integrated consolidation strategy with watcher

This patch adds a new load consolidation strategy based on a heuristic
algorithm which focuses on measured CPU utilization and tries to
minimize hosts which have too much or too little load.
A new goal "vm_workload_consolidation" was added which executes
the strategy "VM_WORKLOAD_CONSOLIDATION".
This work depends on the implemetation of the bug:
https://bugs.launchpad.net/watcher/+bug/1553124

Change-Id: Ide05bddb5c85a3df05b94658ee5bd98f32e554b0
Implements: blueprint basic-cloud-consolidation-integration
This commit is contained in:
Bruno Grazioli
2016-03-07 11:30:09 +01:00
committed by cima
parent 64b5a7c3e4
commit 4c924fc505
5 changed files with 1069 additions and 1 deletions

View File

@@ -0,0 +1,264 @@
# -*- encoding: utf-8 -*-
#
# Authors: Vojtech CIMA <cima@zhaw.ch>
# Bruno GRAZIOLI <gaea@zhaw.ch>
# Sean MURPHY <murp@zhaw.ch>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from watcher.decision_engine.model import hypervisor
from watcher.decision_engine.model import model_root as modelroot
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm as modelvm
from watcher.decision_engine.model import vm_state
from watcher.metrics_engine.cluster_model_collector import base
class FakerModelCollector(base.BaseClusterModelCollector):
def __init__(self):
pass
def get_latest_cluster_data_model(self):
return self.generate_scenario_1()
def generate_scenario_1(self):
"""Simulates cluster with 2 hypervisors and 2 VMs using 1:1 mapping"""
current_state_cluster = modelroot.ModelRoot()
count_node = 2
count_vm = 2
mem = resource.Resource(resource.ResourceType.memory)
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
disk_capacity =\
resource.Resource(resource.ResourceType.disk_capacity)
current_state_cluster.create_resource(mem)
current_state_cluster.create_resource(num_cores)
current_state_cluster.create_resource(disk)
current_state_cluster.create_resource(disk_capacity)
for i in range(0, count_node):
node_uuid = "Node_{0}".format(i)
node = hypervisor.Hypervisor()
node.uuid = node_uuid
node.hostname = "hostname_{0}".format(i)
node.state = 'up'
mem.set_capacity(node, 64)
disk_capacity.set_capacity(node, 250)
num_cores.set_capacity(node, 40)
current_state_cluster.add_hypervisor(node)
for i in range(0, count_vm):
vm_uuid = "VM_{0}".format(i)
vm = modelvm.VM()
vm.uuid = vm_uuid
vm.state = vm_state.VMState.ACTIVE
mem.set_capacity(vm, 2)
disk.set_capacity(vm, 20)
num_cores.set_capacity(vm, 10)
current_state_cluster.add_vm(vm)
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_0"),
current_state_cluster.get_vm_from_id("VM_0"))
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_1"),
current_state_cluster.get_vm_from_id("VM_1"))
return current_state_cluster
def generate_scenario_2(self):
"""Simulates a cluster
With 4 hypervisors and 6 VMs all mapped to one hypervisor
"""
current_state_cluster = modelroot.ModelRoot()
count_node = 4
count_vm = 6
mem = resource.Resource(resource.ResourceType.memory)
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
disk_capacity =\
resource.Resource(resource.ResourceType.disk_capacity)
current_state_cluster.create_resource(mem)
current_state_cluster.create_resource(num_cores)
current_state_cluster.create_resource(disk)
current_state_cluster.create_resource(disk_capacity)
for i in range(0, count_node):
node_uuid = "Node_{0}".format(i)
node = hypervisor.Hypervisor()
node.uuid = node_uuid
node.hostname = "hostname_{0}".format(i)
node.state = 'up'
mem.set_capacity(node, 64)
disk_capacity.set_capacity(node, 250)
num_cores.set_capacity(node, 16)
current_state_cluster.add_hypervisor(node)
for i in range(0, count_vm):
vm_uuid = "VM_{0}".format(i)
vm = modelvm.VM()
vm.uuid = vm_uuid
vm.state = vm_state.VMState.ACTIVE
mem.set_capacity(vm, 2)
disk.set_capacity(vm, 20)
num_cores.set_capacity(vm, 10)
current_state_cluster.add_vm(vm)
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_0"),
current_state_cluster.get_vm_from_id("VM_%s" % str(i)))
return current_state_cluster
def generate_scenario_3(self):
"""Simulates a cluster
With 4 hypervisors and 6 VMs all mapped to one hypervisor
"""
current_state_cluster = modelroot.ModelRoot()
count_node = 2
count_vm = 4
mem = resource.Resource(resource.ResourceType.memory)
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
disk_capacity =\
resource.Resource(resource.ResourceType.disk_capacity)
current_state_cluster.create_resource(mem)
current_state_cluster.create_resource(num_cores)
current_state_cluster.create_resource(disk)
current_state_cluster.create_resource(disk_capacity)
for i in range(0, count_node):
node_uuid = "Node_{0}".format(i)
node = hypervisor.Hypervisor()
node.uuid = node_uuid
node.hostname = "hostname_{0}".format(i)
node.state = 'up'
mem.set_capacity(node, 64)
disk_capacity.set_capacity(node, 250)
num_cores.set_capacity(node, 10)
current_state_cluster.add_hypervisor(node)
for i in range(6, 6 + count_vm):
vm_uuid = "VM_{0}".format(i)
vm = modelvm.VM()
vm.uuid = vm_uuid
vm.state = vm_state.VMState.ACTIVE
mem.set_capacity(vm, 2)
disk.set_capacity(vm, 20)
num_cores.set_capacity(vm, 2 ** (i-6))
current_state_cluster.add_vm(vm)
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_0"),
current_state_cluster.get_vm_from_id("VM_%s" % str(i)))
return current_state_cluster
class FakeCeilometerMetrics(object):
def __init__(self, model):
self.model = model
def mock_get_statistics(self, resource_id, meter_name, period=3600,
aggregate='avg'):
if meter_name == "compute.node.cpu.percent":
return self.get_hypervisor_cpu_util(resource_id)
elif meter_name == "cpu_util":
return self.get_vm_cpu_util(resource_id)
elif meter_name == "memory.usage":
return self.get_vm_ram_util(resource_id)
elif meter_name == "disk.root.size":
return self.get_vm_disk_root_size(resource_id)
def get_hypervisor_cpu_util(self, r_id):
"""Calculates hypervisor utilization dynamicaly.
Hypervisor CPU utilization should consider
and corelate with actual VM-hypervisor mappings
provided within a cluster model.
Returns relative hypervisor CPU utilization <0, 100>.
:param r_id: resource id
"""
id = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1])
vms = self.model.get_mapping().get_node_vms_from_id(id)
util_sum = 0.0
hypervisor_cpu_cores = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity_from_id(id)
for vm_uuid in vms:
vm_cpu_cores = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).\
get_capacity(self.model.get_vm_from_id(vm_uuid))
total_cpu_util = vm_cpu_cores * self.get_vm_cpu_util(vm_uuid)
util_sum += total_cpu_util / 100.0
util_sum /= hypervisor_cpu_cores
return util_sum * 100.0
def get_vm_cpu_util(self, r_id):
vm_cpu_util = dict()
vm_cpu_util['VM_0'] = 10
vm_cpu_util['VM_1'] = 30
vm_cpu_util['VM_2'] = 60
vm_cpu_util['VM_3'] = 20
vm_cpu_util['VM_4'] = 40
vm_cpu_util['VM_5'] = 50
vm_cpu_util['VM_6'] = 100
vm_cpu_util['VM_7'] = 100
vm_cpu_util['VM_8'] = 100
vm_cpu_util['VM_9'] = 100
return vm_cpu_util[str(r_id)]
def get_vm_ram_util(self, r_id):
vm_ram_util = dict()
vm_ram_util['VM_0'] = 1
vm_ram_util['VM_1'] = 2
vm_ram_util['VM_2'] = 4
vm_ram_util['VM_3'] = 8
vm_ram_util['VM_4'] = 3
vm_ram_util['VM_5'] = 2
vm_ram_util['VM_6'] = 1
vm_ram_util['VM_7'] = 2
vm_ram_util['VM_8'] = 4
vm_ram_util['VM_9'] = 8
return vm_ram_util[str(r_id)]
def get_vm_disk_root_size(self, r_id):
vm_disk_util = dict()
vm_disk_util['VM_0'] = 10
vm_disk_util['VM_1'] = 15
vm_disk_util['VM_2'] = 30
vm_disk_util['VM_3'] = 35
vm_disk_util['VM_4'] = 20
vm_disk_util['VM_5'] = 25
vm_disk_util['VM_6'] = 25
vm_disk_util['VM_7'] = 25
vm_disk_util['VM_8'] = 25
vm_disk_util['VM_9'] = 25
return vm_disk_util[str(r_id)]

View File

@@ -0,0 +1,277 @@
# -*- encoding: utf-8 -*-
#
# Authors: Vojtech CIMA <cima@zhaw.ch>
# Bruno GRAZIOLI <gaea@zhaw.ch>
# Sean MURPHY <murp@zhaw.ch>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_and_metrics
class TestSmartConsolidation(base.BaseTestCase):
fake_cluster = faker_cluster_and_metrics.FakerModelCollector()
def test_get_vm_utilization(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
vm_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(vm_util,
strategy.get_vm_utilization(vm_0.uuid, cluster))
def test_get_hypervisor_utilization(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(node_util,
strategy.get_hypervisor_utilization(node_0, cluster))
def test_get_hypervisor_capacity(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=40, ram=64, disk=250)
self.assertEqual(node_util,
strategy.get_hypervisor_capacity(node_0, cluster))
def test_get_relative_hypervisor_utilization(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
hypervisor = model.get_hypervisor_from_id('Node_0')
rhu = strategy.get_relative_hypervisor_utilization(hypervisor, model)
expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025}
self.assertEqual(expected_rhu, rhu)
def test_get_relative_cluster_utilization(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
cru = strategy.get_relative_cluster_utilization(model)
expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375}
self.assertEqual(expected_cru, cru)
def test_add_migration(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
strategy.add_migration(vm_uuid, h1, h2, model)
self.assertEqual(1, len(strategy.solution.actions))
expected = {'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': vm_uuid}}
self.assertEqual(expected, strategy.solution.actions[0])
def test_is_overloaded(self):
strategy = strategies.VMWorkloadConsolidation()
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res)
cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
self.assertEqual(True, res)
def test_vm_fits(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(True, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(False, res)
def test_add_action_activate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_activate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'up',
'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions)
def test_add_action_deactivate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_deactivate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions)
def test_deactivate_unused_hypervisors(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
strategy.deactivate_unused_hypervisors(model)
self.assertEqual(0, len(strategy.solution.actions))
# Migrate VM to free the hypervisor
strategy.add_migration(vm_uuid, h1, h2, model)
strategy.deactivate_unused_hypervisors(model)
expected = {'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}}
self.assertEqual(2, len(strategy.solution.actions))
self.assertEqual(expected, strategy.solution.actions[1])
def test_offload_phase(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
expected = []
self.assertEqual(expected, strategy.solution.actions)
def test_consolidation_phase(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.consolidation_phase(model, cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': vm_uuid}}]
self.assertEqual(expected, strategy.solution.actions)
def test_strategy(self):
model = self.fake_cluster.generate_scenario_2()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
strategy.consolidation_phase(model, cc)
strategy.optimize_solution(model)
h2 = strategy.solution.actions[0][
'input_parameters']['dst_hypervisor']
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': 'VM_3'}},
{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': 'VM_1'}}]
self.assertEqual(expected, strategy.solution.actions)
def test_strategy2(self):
model = self.fake_cluster.generate_scenario_3()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live',
'resource_id': 'VM_6',
'src_hypervisor': h1.uuid}},
{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live',
'resource_id': 'VM_7',
'src_hypervisor': h1.uuid}},
{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live',
'resource_id': 'VM_8',
'src_hypervisor': h1.uuid}}]
self.assertEqual(expected, strategy.solution.actions)
strategy.consolidation_phase(model, cc)
expected.append({'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': 'VM_7',
'src_hypervisor': h2.uuid}})
self.assertEqual(expected, strategy.solution.actions)
strategy.optimize_solution(model)
del expected[3]
del expected[1]
self.assertEqual(expected, strategy.solution.actions)