Add gnocchi support in VM-Workload-Consolidation strategy

This patch adds gnocchi support in VM-Workload-Consolidation strategy
and adds unit tests corresponding to that change.

Change-Id: I4aab158a6b7c92cb9fe8979bb8bd6338c4686b11
Partiallly-Implements: bp gnocchi-watcher
This commit is contained in:
Santhosh Fernandes
2017-03-28 15:33:10 +05:30
parent 4a3c15185c
commit 4642a92e78
3 changed files with 260 additions and 45 deletions

View File

@@ -52,13 +52,16 @@ correctly on all compute nodes within the cluster.
This strategy assumes it is possible to live migrate any VM from
an active compute node to any other active compute node.
"""
import datetime
from oslo_config import cfg
from oslo_log import log
import six
from watcher._i18n import _
from watcher.common import exception
from watcher.datasource import ceilometer as ceil
from watcher.datasource import gnocchi as gnoc
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
@@ -68,12 +71,33 @@ LOG = log.getLogger(__name__)
class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
"""VM Workload Consolidation Strategy"""
HOST_CPU_USAGE_METRIC_NAME = 'compute.node.cpu.percent'
INSTANCE_CPU_USAGE_METRIC_NAME = 'cpu_util'
METRIC_NAMES = dict(
ceilometer=dict(
cpu_util_metric='cpu_util',
ram_util_metric='memory.usage',
ram_alloc_metric='memory',
disk_alloc_metric='disk.root.size'),
gnocchi=dict(
cpu_util_metric='cpu_util',
ram_util_metric='memory.usage',
ram_alloc_metric='memory',
disk_alloc_metric='disk.root.size'),
)
MIGRATION = "migrate"
CHANGE_NOVA_SERVICE_STATE = "change_nova_service_state"
def __init__(self, config, osc=None):
super(VMWorkloadConsolidation, self).__init__(config, osc)
self._ceilometer = None
self._gnocchi = None
self.number_of_migrations = 0
self.number_of_released_nodes = 0
self.ceilometer_instance_data_cache = dict()
# self.ceilometer_instance_data_cache = dict()
self.datasource_instance_data_cache = dict()
@classmethod
def get_name(cls):
@@ -101,6 +125,20 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
def ceilometer(self, ceilometer):
self._ceilometer = ceilometer
@property
def gnocchi(self):
if self._gnocchi is None:
self._gnocchi = gnoc.GnocchiHelper(osc=self.osc)
return self._gnocchi
@gnocchi.setter
def gnocchi(self, gnocchi):
self._gnocchi = gnocchi
@property
def granularity(self):
return self.input_parameters.get('granularity', 300)
@classmethod
def get_schema(cls):
# Mandatory default setting for each element
@@ -111,10 +149,26 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
"getting statistic aggregation",
"type": "number",
"default": 3600
}
},
"granularity": {
"description": "The time between two measures in an "
"aggregated timeseries of a metric.",
"type": "number",
"default": 300
},
}
}
@classmethod
def get_config_opts(cls):
return [
cfg.StrOpt(
"datasource",
help="Data source to use in order to query the needed metrics",
default="ceilometer",
choices=["ceilometer", "gnocchi"])
]
def get_state_str(self, state):
"""Get resource state in string format.
@@ -139,7 +193,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
"""
params = {'state': element.ServiceState.ENABLED.value}
self.solution.add_action(
action_type='change_nova_service_state',
action_type=self.CHANGE_NOVA_SERVICE_STATE,
resource_id=node.uuid,
input_parameters=params)
self.number_of_released_nodes -= 1
@@ -152,7 +206,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
"""
params = {'state': element.ServiceState.DISABLED.value}
self.solution.add_action(
action_type='change_nova_service_state',
action_type=self.CHANGE_NOVA_SERVICE_STATE,
resource_id=node.uuid,
input_parameters=params)
self.number_of_released_nodes += 1
@@ -189,7 +243,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
params = {'migration_type': migration_type,
'source_node': source_node.uuid,
'destination_node': destination_node.uuid}
self.solution.add_action(action_type='migrate',
self.solution.add_action(action_type=self.MIGRATION,
resource_id=instance.uuid,
input_parameters=params)
self.number_of_migrations += 1
@@ -205,56 +259,98 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
element.ServiceState.DISABLED.value):
self.add_action_disable_node(node)
def get_instance_utilization(self, instance, aggr='avg'):
def get_instance_utilization(self, instance):
"""Collect cpu, ram and disk utilization statistics of a VM.
:param instance: instance object
:param aggr: string
:return: dict(cpu(number of vcpus used), ram(MB used), disk(B used))
"""
if instance.uuid in self.ceilometer_instance_data_cache.keys():
return self.ceilometer_instance_data_cache.get(instance.uuid)
instance_cpu_util = None
instance_ram_util = None
instance_disk_util = None
cpu_util_metric = 'cpu_util'
ram_util_metric = 'memory.usage'
if instance.uuid in self.datasource_instance_data_cache.keys():
return self.datasource_instance_data_cache.get(instance.uuid)
ram_alloc_metric = 'memory'
disk_alloc_metric = 'disk.root.size'
instance_cpu_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=cpu_util_metric,
period=self.period, aggregate=aggr)
cpu_util_metric = self.METRIC_NAMES[
self.config.datasource]['cpu_util_metric']
ram_util_metric = self.METRIC_NAMES[
self.config.datasource]['ram_util_metric']
ram_alloc_metric = self.METRIC_NAMES[
self.config.datasource]['ram_alloc_metric']
disk_alloc_metric = self.METRIC_NAMES[
self.config.datasource]['disk_alloc_metric']
if self.config.datasource == "ceilometer":
instance_cpu_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=cpu_util_metric,
period=self.period, aggregate='avg')
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=ram_util_metric,
period=self.period, aggregate='avg')
if not instance_ram_util:
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=ram_alloc_metric,
period=self.period, aggregate='avg')
instance_disk_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=disk_alloc_metric,
period=self.period, aggregate='avg')
elif self.config.datasource == "gnocchi":
stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(
seconds=int(self.period))
instance_cpu_util = self.gnocchi.statistic_aggregation(
resource_id=instance.uuid,
metric=cpu_util_metric,
granularity=self.granularity,
start_time=start_time,
stop_time=stop_time,
aggregation='mean'
)
instance_ram_util = self.gnocchi.statistic_aggregation(
resource_id=instance.uuid,
metric=ram_util_metric,
granularity=self.granularity,
start_time=start_time,
stop_time=stop_time,
aggregation='mean'
)
if not instance_ram_util:
instance_ram_util = self.gnocchi.statistic_aggregation(
resource_id=instance.uuid,
metric=ram_alloc_metric,
granularity=self.granularity,
start_time=start_time,
stop_time=stop_time,
aggregation='mean'
)
instance_disk_util = self.gnocchi.statistic_aggregation(
resource_id=instance.uuid,
metric=disk_alloc_metric,
granularity=self.granularity,
start_time=start_time,
stop_time=stop_time,
aggregation='mean'
)
if instance_cpu_util:
total_cpu_utilization = (
instance.vcpus * (instance_cpu_util / 100.0))
else:
total_cpu_utilization = instance.vcpus
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=ram_util_metric,
period=self.period, aggregate=aggr)
if not instance_ram_util:
instance_ram_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=ram_alloc_metric,
period=self.period, aggregate=aggr)
instance_disk_util = self.ceilometer.statistic_aggregation(
resource_id=instance.uuid, meter_name=disk_alloc_metric,
period=self.period, aggregate=aggr)
if not instance_ram_util or not instance_disk_util:
LOG.error(
'No values returned by %s for memory.usage '
'or disk.root.size', instance.uuid)
raise exception.NoDataFound
self.ceilometer_instance_data_cache[instance.uuid] = dict(
self.datasource_instance_data_cache[instance.uuid] = dict(
cpu=total_cpu_utilization, ram=instance_ram_util,
disk=instance_disk_util)
return self.ceilometer_instance_data_cache.get(instance.uuid)
return self.datasource_instance_data_cache.get(instance.uuid)
def get_node_utilization(self, node, aggr='avg'):
def get_node_utilization(self, node):
"""Collect cpu, ram and disk utilization statistics of a node.
:param node: node object
@@ -267,7 +363,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
node_cpu_util = 0
for instance in node_instances:
instance_util = self.get_instance_utilization(
instance, aggr)
instance)
node_cpu_util += instance_util['cpu']
node_ram_util += instance_util['ram']
node_disk_util += instance_util['disk']
@@ -372,7 +468,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
"""
migrate_actions = (
a for a in self.solution.actions if a[
'action_type'] == 'migrate')
'action_type'] == self.MIGRATION)
instance_to_be_migrated = (
a['input_parameters']['resource_id'] for a in migrate_actions)
instance_uuids = list(set(instance_to_be_migrated))

View File

@@ -157,3 +157,86 @@ class FakeCeilometerMetrics(object):
instance_disk_util['INSTANCE_8'] = 25
instance_disk_util['INSTANCE_9'] = 25
return instance_disk_util[str(r_id)]
class FakeGnocchiMetrics(object):
def __init__(self, model):
self.model = model
def mock_get_statistics(self, resource_id, metric, granularity,
start_time, stop_time, aggregation='mean'):
if metric == "compute.node.cpu.percent":
return self.get_node_cpu_util(resource_id)
elif metric == "cpu_util":
return self.get_instance_cpu_util(resource_id)
elif metric == "memory.usage":
return self.get_instance_ram_util(resource_id)
elif metric == "disk.root.size":
return self.get_instance_disk_root_size(resource_id)
def get_node_cpu_util(self, r_id):
"""Calculates node utilization dynamicaly.
node CPU utilization should consider
and corelate with actual instance-node mappings
provided within a cluster model.
Returns relative node CPU utilization <0, 100>.
:param r_id: resource id
"""
node_uuid = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1])
node = self.model.get_node_by_uuid(node_uuid)
instances = self.model.get_node_instances(node)
util_sum = 0.0
for instance_uuid in instances:
instance = self.model.get_instance_by_uuid(instance_uuid)
total_cpu_util = instance.vcpus * self.get_instance_cpu_util(
instance.uuid)
util_sum += total_cpu_util / 100.0
util_sum /= node.vcpus
return util_sum * 100.0
@staticmethod
def get_instance_cpu_util(r_id):
instance_cpu_util = dict()
instance_cpu_util['INSTANCE_0'] = 10
instance_cpu_util['INSTANCE_1'] = 30
instance_cpu_util['INSTANCE_2'] = 60
instance_cpu_util['INSTANCE_3'] = 20
instance_cpu_util['INSTANCE_4'] = 40
instance_cpu_util['INSTANCE_5'] = 50
instance_cpu_util['INSTANCE_6'] = 100
instance_cpu_util['INSTANCE_7'] = 100
instance_cpu_util['INSTANCE_8'] = 100
instance_cpu_util['INSTANCE_9'] = 100
return instance_cpu_util[str(r_id)]
@staticmethod
def get_instance_ram_util(r_id):
instance_ram_util = dict()
instance_ram_util['INSTANCE_0'] = 1
instance_ram_util['INSTANCE_1'] = 2
instance_ram_util['INSTANCE_2'] = 4
instance_ram_util['INSTANCE_3'] = 8
instance_ram_util['INSTANCE_4'] = 3
instance_ram_util['INSTANCE_5'] = 2
instance_ram_util['INSTANCE_6'] = 1
instance_ram_util['INSTANCE_7'] = 2
instance_ram_util['INSTANCE_8'] = 4
instance_ram_util['INSTANCE_9'] = 8
return instance_ram_util[str(r_id)]
@staticmethod
def get_instance_disk_root_size(r_id):
instance_disk_util = dict()
instance_disk_util['INSTANCE_0'] = 10
instance_disk_util['INSTANCE_1'] = 15
instance_disk_util['INSTANCE_2'] = 30
instance_disk_util['INSTANCE_3'] = 35
instance_disk_util['INSTANCE_4'] = 20
instance_disk_util['INSTANCE_5'] = 25
instance_disk_util['INSTANCE_6'] = 25
instance_disk_util['INSTANCE_7'] = 25
instance_disk_util['INSTANCE_8'] = 25
instance_disk_util['INSTANCE_9'] = 25
return instance_disk_util[str(r_id)]

View File

@@ -18,6 +18,7 @@
# limitations under the License.
#
import datetime
import mock
from watcher.common import exception
@@ -29,6 +30,17 @@ from watcher.tests.decision_engine.model import faker_cluster_and_metrics
class TestVMWorkloadConsolidation(base.TestCase):
scenarios = [
("Ceilometer",
{"datasource": "ceilometer",
"fake_datasource_cls":
faker_cluster_and_metrics.FakeCeilometerMetrics}),
("Gnocchi",
{"datasource": "gnocchi",
"fake_datasource_cls":
faker_cluster_and_metrics.FakeGnocchiMetrics}),
]
def setUp(self):
super(TestVMWorkloadConsolidation, self).setUp()
@@ -41,11 +53,11 @@ class TestVMWorkloadConsolidation(base.TestCase):
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
p_ceilometer = mock.patch.object(
strategies.VMWorkloadConsolidation, "ceilometer",
p_datasource = mock.patch.object(
strategies.VMWorkloadConsolidation, self.datasource,
new_callable=mock.PropertyMock)
self.m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
self.m_datasource = p_datasource.start()
self.addCleanup(p_datasource.stop)
p_audit_scope = mock.patch.object(
strategies.VMWorkloadConsolidation, "audit_scope",
@@ -57,13 +69,14 @@ class TestVMWorkloadConsolidation(base.TestCase):
self.m_audit_scope.return_value = mock.Mock()
# fake metrics
self.fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(
self.fake_metrics = self.fake_datasource_cls(
self.m_model.return_value)
self.m_model.return_value = model_root.ModelRoot()
self.m_ceilometer.return_value = mock.Mock(
self.m_datasource.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
self.strategy = strategies.VMWorkloadConsolidation(config=mock.Mock())
self.strategy = strategies.VMWorkloadConsolidation(
config=mock.Mock(datasource=self.datasource))
def test_exception_stale_cdm(self):
self.fake_cluster.set_cluster_data_model_as_stale()
@@ -81,7 +94,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
instance_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(
instance_util,
self.strategy.get_instance_utilization(instance_0, model))
self.strategy.get_instance_utilization(instance_0))
def test_get_node_utilization(self):
model = self.fake_cluster.generate_scenario_1()
@@ -91,7 +104,7 @@ class TestVMWorkloadConsolidation(base.TestCase):
node_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(
node_util,
self.strategy.get_node_utilization(node_0, model))
self.strategy.get_node_utilization(node_0))
def test_get_node_capacity(self):
model = self.fake_cluster.generate_scenario_1()
@@ -301,10 +314,33 @@ class TestVMWorkloadConsolidation(base.TestCase):
strategies.VMWorkloadConsolidation, "ceilometer")
m_ceilometer = p_ceilometer.start()
self.addCleanup(p_ceilometer.stop)
p_gnocchi = mock.patch.object(
strategies.VMWorkloadConsolidation, "gnocchi")
m_gnocchi = p_gnocchi.start()
self.addCleanup(p_gnocchi.stop)
datetime_patcher = mock.patch.object(
datetime, 'datetime',
mock.Mock(wraps=datetime.datetime)
)
mocked_datetime = datetime_patcher.start()
mocked_datetime.utcnow.return_value = datetime.datetime(
2017, 3, 19, 18, 53, 11, 657417)
self.addCleanup(datetime_patcher.stop)
m_ceilometer.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
m_gnocchi.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
instance0 = model.get_instance_by_uuid("INSTANCE_0")
self.strategy.get_instance_utilization(instance0)
m_ceilometer.statistic_aggregation.assert_any_call(
aggregate='avg', meter_name='disk.root.size',
period=3600, resource_id=instance0.uuid)
if self.strategy.config.datasource == "ceilometer":
m_ceilometer.statistic_aggregation.assert_any_call(
aggregate='avg', meter_name='disk.root.size',
period=3600, resource_id=instance0.uuid)
elif self.strategy.config.datasource == "gnocchi":
stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(
seconds=int('3600'))
m_gnocchi.statistic_aggregation.assert_called_with(
resource_id=instance0.uuid, metric='disk.root.size',
granularity=300, start_time=start_time, stop_time=stop_time,
aggregation='mean')