Merge "Improve variable names in strategy implementations"

This commit is contained in:
Jenkins
2016-02-18 10:22:50 +00:00
committed by Gerrit Code Review
4 changed files with 123 additions and 110 deletions

View File

@@ -64,10 +64,10 @@ class BaseStrategy(object):
self._osc = osc
@abc.abstractmethod
def execute(self, model):
def execute(self, original_model):
"""Execute a strategy
:param model: The name of the strategy to execute (loaded dynamically)
:param original_model: The model the strategy is executed on
:type model: str
:return: A computed solution (via a placement algorithm)
:rtype: :class:`watcher.decision_engine.solution.base.BaseSolution`

View File

@@ -109,8 +109,8 @@ class BasicConsolidation(base.BaseStrategy):
return self._ceilometer
@ceilometer.setter
def ceilometer(self, c):
self._ceilometer = c
def ceilometer(self, ceilometer):
self._ceilometer = ceilometer
def compute_attempts(self, size_cluster):
"""Upper bound of the number of migration
@@ -119,13 +119,13 @@ class BasicConsolidation(base.BaseStrategy):
"""
self.migration_attempts = size_cluster * self.bound_migration
def check_migration(self, model,
def check_migration(self, cluster_data_model,
src_hypervisor,
dest_hypervisor,
vm_to_mig):
"""check if the migration is possible
:param model: the current state of the cluster
:param cluster_data_model: the current state of the cluster
:param src_hypervisor: the current node of the virtual machine
:param dest_hypervisor: the destination of the virtual machine
:param vm_to_mig: the virtual machine
@@ -142,28 +142,32 @@ class BasicConsolidation(base.BaseStrategy):
total_cores = 0
total_disk = 0
total_mem = 0
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cpu_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.cpu_cores)
disk_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.disk)
memory_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.memory)
for vm_id in model.get_mapping().get_node_vms(dest_hypervisor):
vm = model.get_vm_from_id(vm_id)
total_cores += cap_cores.get_capacity(vm)
total_disk += cap_disk.get_capacity(vm)
total_mem += cap_mem.get_capacity(vm)
for vm_id in cluster_data_model. \
get_mapping().get_node_vms(dest_hypervisor):
vm = cluster_data_model.get_vm_from_id(vm_id)
total_cores += cpu_capacity.get_capacity(vm)
total_disk += disk_capacity.get_capacity(vm)
total_mem += memory_capacity.get_capacity(vm)
# capacity requested by hypervisor
total_cores += cap_cores.get_capacity(vm_to_mig)
total_disk += cap_disk.get_capacity(vm_to_mig)
total_mem += cap_mem.get_capacity(vm_to_mig)
total_cores += cpu_capacity.get_capacity(vm_to_mig)
total_disk += disk_capacity.get_capacity(vm_to_mig)
total_mem += memory_capacity.get_capacity(vm_to_mig)
return self.check_threshold(model,
return self.check_threshold(cluster_data_model,
dest_hypervisor,
total_cores,
total_disk,
total_mem)
def check_threshold(self, model,
def check_threshold(self, cluster_data_model,
dest_hypervisor,
total_cores,
total_disk,
@@ -173,23 +177,23 @@ class BasicConsolidation(base.BaseStrategy):
check the threshold value defined by the ratio of
aggregated CPU capacity of VMs on one node to CPU capacity
of this node must not exceed the threshold value.
:param dest_hypervisor:
:param cluster_data_model: the current state of the cluster
:param dest_hypervisor: the destination of the virtual machine
:param total_cores
:param total_disk
:param total_mem
:return: True if the threshold is not exceed
"""
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
# available
cores_available = cap_cores.get_capacity(dest_hypervisor)
disk_available = cap_disk.get_capacity(dest_hypervisor)
mem_available = cap_mem.get_capacity(dest_hypervisor)
cpu_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor)
disk_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(dest_hypervisor)
memory_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(dest_hypervisor)
if cores_available >= total_cores * self.threshold_cores \
and disk_available >= total_disk * self.threshold_disk \
and mem_available >= total_mem * self.threshold_mem:
if (cpu_capacity >= total_cores * self.threshold_cores and
disk_capacity >= total_disk * self.threshold_disk and
memory_capacity >= total_mem * self.threshold_mem):
return True
else:
return False
@@ -215,24 +219,25 @@ class BasicConsolidation(base.BaseStrategy):
def get_number_of_migrations(self):
return self.number_of_migrations
def calculate_weight(self, model, element, total_cores_used,
total_disk_used, total_memory_used):
def calculate_weight(self, cluster_data_model, element,
total_cores_used, total_disk_used,
total_memory_used):
"""Calculate weight of every resource
:param model:
:param cluster_data_model:
:param element:
:param total_cores_used:
:param total_disk_used:
:param total_memory_used:
:return:
"""
cpu_capacity = model.get_resource_from_id(
cpu_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(element)
disk_capacity = model.get_resource_from_id(
disk_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.disk).get_capacity(element)
memory_capacity = model.get_resource_from_id(
memory_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(element)
score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
@@ -259,25 +264,25 @@ class BasicConsolidation(base.BaseStrategy):
:return:
"""
resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname)
cpu_avg_vm = self.ceilometer. \
vm_avg_cpu_util = self.ceilometer. \
statistic_aggregation(resource_id=resource_id,
meter_name=self.HOST_CPU_USAGE_METRIC_NAME,
period="7200",
aggregate='avg'
)
if cpu_avg_vm is None:
if vm_avg_cpu_util is None:
LOG.error(
_LE("No values returned by %(resource_id)s "
"for %(metric_name)s"),
resource_id=resource_id,
metric_name=self.HOST_CPU_USAGE_METRIC_NAME,
)
cpu_avg_vm = 100
vm_avg_cpu_util = 100
cpu_capacity = model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(hypervisor)
total_cores_used = cpu_capacity * (cpu_avg_vm / 100)
total_cores_used = cpu_capacity * (vm_avg_cpu_util / 100)
return self.calculate_weight(model, hypervisor, total_cores_used,
0,
@@ -295,14 +300,14 @@ class BasicConsolidation(base.BaseStrategy):
else:
return 0
def calculate_score_vm(self, vm, model):
def calculate_score_vm(self, vm, cluster_data_model):
"""Calculate Score of virtual machine
:param vm: the virtual machine
:param model: the model
:param cluster_data_model: the cluster model
:return: score
"""
if model is None:
if cluster_data_model is None:
raise exception.ClusterStateNotDefined()
vm_cpu_utilization = self.ceilometer. \
@@ -321,14 +326,13 @@ class BasicConsolidation(base.BaseStrategy):
)
vm_cpu_utilization = 100
cpu_capacity = model.get_resource_from_id(
cpu_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(vm)
total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0)
return self.calculate_weight(model, vm, total_cores_used,
0,
0)
return self.calculate_weight(cluster_data_model, vm,
total_cores_used, 0, 0)
def add_change_service_state(self, resource_id, state):
parameters = {'state': state}
@@ -348,14 +352,16 @@ class BasicConsolidation(base.BaseStrategy):
resource_id=resource_id,
input_parameters=parameters)
def score_of_nodes(self, current_model, score):
def score_of_nodes(self, cluster_data_model, score):
"""Calculate score of nodes based on load by VMs"""
for hypervisor_id in current_model.get_all_hypervisors():
hypervisor = current_model.get_hypervisor_from_id(hypervisor_id)
count = current_model.get_mapping(). \
for hypervisor_id in cluster_data_model.get_all_hypervisors():
hypervisor = cluster_data_model. \
get_hypervisor_from_id(hypervisor_id)
count = cluster_data_model.get_mapping(). \
get_node_vms_from_id(hypervisor_id)
if len(count) > 0:
result = self.calculate_score_node(hypervisor, current_model)
result = self.calculate_score_node(hypervisor,
cluster_data_model)
else:
''' the hypervisor has not VMs '''
result = 0
@@ -363,9 +369,9 @@ class BasicConsolidation(base.BaseStrategy):
score.append((hypervisor_id, result))
return score
def node_and_vm_score(self, s, score, current_model):
def node_and_vm_score(self, sorted_score, score, current_model):
"""Get List of VMs from Node"""
node_to_release = s[len(score) - 1][0]
node_to_release = sorted_score[len(score) - 1][0]
vms_to_mig = current_model.get_mapping().get_node_vms_from_id(
node_to_release)
@@ -395,47 +401,49 @@ class BasicConsolidation(base.BaseStrategy):
OFFLINE.value)
self.number_of_released_nodes += 1
def calculate_m(self, v, current_model, node_to_release, s):
m = 0
for vm in v:
for j in range(0, len(s)):
def calculate_num_migrations(self, sorted_vms, current_model,
node_to_release, sorted_score):
number_migrations = 0
for vm in sorted_vms:
for j in range(0, len(sorted_score)):
mig_vm = current_model.get_vm_from_id(vm[0])
mig_src_hypervisor = current_model.get_hypervisor_from_id(
node_to_release)
mig_dst_hypervisor = current_model.get_hypervisor_from_id(
s[j][0])
sorted_score[j][0])
result = self.check_migration(current_model,
mig_src_hypervisor,
mig_dst_hypervisor, mig_vm)
if result is True:
if result:
self.create_migration_vm(
current_model, mig_vm,
mig_src_hypervisor, mig_dst_hypervisor)
m += 1
number_migrations += 1
break
return m
return number_migrations
def unsuccessful_migration_actualization(self, m, unsuccessful_migration):
if m > 0:
self.number_of_migrations += m
def unsuccessful_migration_actualization(self, number_migrations,
unsuccessful_migration):
if number_migrations > 0:
self.number_of_migrations += number_migrations
return 0
else:
return unsuccessful_migration + 1
def execute(self, orign_model):
def execute(self, original_model):
LOG.info(_LI("Initializing Sercon Consolidation"))
if orign_model is None:
if original_model is None:
raise exception.ClusterStateNotDefined()
# todo(jed) clone model
current_model = orign_model
current_model = original_model
self.efficacy = 100
unsuccessful_migration = 0
first = True
first_migration = True
size_cluster = len(current_model.get_all_hypervisors())
if size_cluster == 0:
raise exception.ClusterEmpty()
@@ -453,18 +461,18 @@ class BasicConsolidation(base.BaseStrategy):
OFFLINE.value)
while self.get_allowed_migration_attempts() >= unsuccessful_migration:
if first is not True:
if not first_migration:
self.efficacy = self.calculate_migration_efficacy()
if self.efficacy < float(self.target_efficacy):
break
first = False
first_migration = False
score = []
score = self.score_of_nodes(current_model, score)
''' sort compute nodes by Score decreasing '''''
s = sorted(score, reverse=True, key=lambda x: (x[1]))
LOG.debug("Hypervisor(s) BFD {0}".format(s))
sorted_score = sorted(score, reverse=True, key=lambda x: (x[1]))
LOG.debug("Hypervisor(s) BFD {0}".format(sorted_score))
''' get Node to be released '''
if len(score) == 0:
@@ -474,17 +482,18 @@ class BasicConsolidation(base.BaseStrategy):
break
node_to_release, vm_score = self.node_and_vm_score(
s, score, current_model)
sorted_score, score, current_model)
''' sort VMs by Score '''
v = sorted(vm_score, reverse=True, key=lambda x: (x[1]))
sorted_vms = sorted(vm_score, reverse=True, key=lambda x: (x[1]))
# BFD: Best Fit Decrease
LOG.debug("VM(s) BFD {0}".format(v))
LOG.debug("VM(s) BFD {0}".format(sorted_vms))
m = self.calculate_m(v, current_model, node_to_release, s)
migrations = self.calculate_num_migrations(
sorted_vms, current_model, node_to_release, sorted_score)
unsuccessful_migration = self.unsuccessful_migration_actualization(
m, unsuccessful_migration)
migrations, unsuccessful_migration)
infos = {
"number_of_migrations": self.number_of_migrations,
"number_of_nodes_released": self.number_of_released_nodes,

View File

@@ -34,7 +34,7 @@ class DummyStrategy(base.BaseStrategy):
osc=None):
super(DummyStrategy, self).__init__(name, description, osc)
def execute(self, model):
def execute(self, original_model):
LOG.debug("Executing Dummy strategy")
parameters = {'message': 'hello World'}
self.solution.add_action(action_type=self.NOP,

View File

@@ -88,25 +88,26 @@ class OutletTempControl(base.BaseStrategy):
def ceilometer(self, c):
self._ceilometer = c
def calc_used_res(self, model, hypervisor, cap_cores, cap_mem, cap_disk):
def calc_used_res(self, cluster_data_model, hypervisor, cpu_capacity,
memory_capacity, disk_capacity):
'''calculate the used vcpus, memory and disk based on VM flavors'''
vms = model.get_mapping().get_node_vms(hypervisor)
vms = cluster_data_model.get_mapping().get_node_vms(hypervisor)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
if len(vms) > 0:
for vm_id in vms:
vm = model.get_vm_from_id(vm_id)
vcpus_used += cap_cores.get_capacity(vm)
memory_mb_used += cap_mem.get_capacity(vm)
disk_gb_used += cap_disk.get_capacity(vm)
vm = cluster_data_model.get_vm_from_id(vm_id)
vcpus_used += cpu_capacity.get_capacity(vm)
memory_mb_used += memory_capacity.get_capacity(vm)
disk_gb_used += disk_capacity.get_capacity(vm)
return vcpus_used, memory_mb_used, disk_gb_used
def group_hosts_by_outlet_temp(self, model):
def group_hosts_by_outlet_temp(self, cluster_data_model):
"""Group hosts based on outlet temp meters"""
hypervisors = model.get_all_hypervisors()
hypervisors = cluster_data_model.get_all_hypervisors()
size_cluster = len(hypervisors)
if size_cluster == 0:
raise wexc.ClusterEmpty()
@@ -114,7 +115,8 @@ class OutletTempControl(base.BaseStrategy):
hosts_need_release = []
hosts_target = []
for hypervisor_id in hypervisors:
hypervisor = model.get_hypervisor_from_id(hypervisor_id)
hypervisor = cluster_data_model.get_hypervisor_from_id(
hypervisor_id)
resource_id = hypervisor.uuid
outlet_temp = self.ceilometer.statistic_aggregation(
@@ -136,17 +138,18 @@ class OutletTempControl(base.BaseStrategy):
hosts_target.append(hvmap)
return hosts_need_release, hosts_target
def choose_vm_to_migrate(self, model, hosts):
def choose_vm_to_migrate(self, cluster_data_model, hosts):
"""pick up an active vm instance to migrate from provided hosts"""
for hvmap in hosts:
mig_src_hypervisor = hvmap['hv']
vms_of_src = model.get_mapping().get_node_vms(mig_src_hypervisor)
vms_of_src = cluster_data_model.get_mapping().get_node_vms(
mig_src_hypervisor)
if len(vms_of_src) > 0:
for vm_id in vms_of_src:
try:
# select the first active VM to migrate
vm = model.get_vm_from_id(vm_id)
vm = cluster_data_model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value:
LOG.info(_LE("VM not active, skipped: %s"),
vm.uuid)
@@ -158,44 +161,45 @@ class OutletTempControl(base.BaseStrategy):
return None
def filter_dest_servers(self, model, hosts, vm_to_migrate):
def filter_dest_servers(self, cluster_data_model, hosts, vm_to_migrate):
"""Only return hosts with sufficient available resources"""
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cpu_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.cpu_cores)
disk_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.disk)
memory_capacity = cluster_data_model.get_resource_from_id(
resource.ResourceType.memory)
required_cores = cap_cores.get_capacity(vm_to_migrate)
required_disk = cap_disk.get_capacity(vm_to_migrate)
required_mem = cap_mem.get_capacity(vm_to_migrate)
required_cores = cpu_capacity.get_capacity(vm_to_migrate)
required_disk = disk_capacity.get_capacity(vm_to_migrate)
required_memory = memory_capacity.get_capacity(vm_to_migrate)
# filter hypervisors without enough resource
dest_servers = []
for hvmap in hosts:
host = hvmap['hv']
# available
cores_used, mem_used, disk_used = self.calc_used_res(model,
host,
cap_cores,
cap_mem,
cap_disk)
cores_available = cap_cores.get_capacity(host) - cores_used
disk_available = cap_disk.get_capacity(host) - mem_used
mem_available = cap_mem.get_capacity(host) - disk_used
cores_used, mem_used, disk_used = self.calc_used_res(
cluster_data_model, host, cpu_capacity, memory_capacity,
disk_capacity)
cores_available = cpu_capacity.get_capacity(host) - cores_used
disk_available = disk_capacity.get_capacity(host) - mem_used
mem_available = memory_capacity.get_capacity(host) - disk_used
if cores_available >= required_cores \
and disk_available >= required_disk \
and mem_available >= required_mem:
and mem_available >= required_memory:
dest_servers.append(hvmap)
return dest_servers
def execute(self, orign_model):
def execute(self, original_model):
LOG.debug("Initializing Outlet temperature strategy")
if orign_model is None:
if original_model is None:
raise wexc.ClusterStateNotDefined()
current_model = orign_model
current_model = original_model
hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp(
current_model)