diff --git a/watcher/decision_engine/strategy/strategies/base.py b/watcher/decision_engine/strategy/strategies/base.py index 67e32a11b..7054819bf 100644 --- a/watcher/decision_engine/strategy/strategies/base.py +++ b/watcher/decision_engine/strategy/strategies/base.py @@ -64,10 +64,10 @@ class BaseStrategy(object): self._osc = osc @abc.abstractmethod - def execute(self, model): + def execute(self, original_model): """Execute a strategy - :param model: The name of the strategy to execute (loaded dynamically) + :param original_model: The model the strategy is executed on :type model: str :return: A computed solution (via a placement algorithm) :rtype: :class:`watcher.decision_engine.solution.base.BaseSolution` diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index 4c5053115..02887d16f 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -109,8 +109,8 @@ class BasicConsolidation(base.BaseStrategy): return self._ceilometer @ceilometer.setter - def ceilometer(self, c): - self._ceilometer = c + def ceilometer(self, ceilometer): + self._ceilometer = ceilometer def compute_attempts(self, size_cluster): """Upper bound of the number of migration @@ -119,13 +119,13 @@ class BasicConsolidation(base.BaseStrategy): """ self.migration_attempts = size_cluster * self.bound_migration - def check_migration(self, model, + def check_migration(self, cluster_data_model, src_hypervisor, dest_hypervisor, vm_to_mig): """check if the migration is possible - :param model: the current state of the cluster + :param cluster_data_model: the current state of the cluster :param src_hypervisor: the current node of the virtual machine :param dest_hypervisor: the destination of the virtual machine :param vm_to_mig: the virtual machine @@ -142,28 +142,32 @@ class BasicConsolidation(base.BaseStrategy): total_cores = 0 total_disk = 0 total_mem = 0 - cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) - cap_disk = model.get_resource_from_id(resource.ResourceType.disk) - cap_mem = model.get_resource_from_id(resource.ResourceType.memory) + cpu_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.cpu_cores) + disk_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.disk) + memory_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.memory) - for vm_id in model.get_mapping().get_node_vms(dest_hypervisor): - vm = model.get_vm_from_id(vm_id) - total_cores += cap_cores.get_capacity(vm) - total_disk += cap_disk.get_capacity(vm) - total_mem += cap_mem.get_capacity(vm) + for vm_id in cluster_data_model. \ + get_mapping().get_node_vms(dest_hypervisor): + vm = cluster_data_model.get_vm_from_id(vm_id) + total_cores += cpu_capacity.get_capacity(vm) + total_disk += disk_capacity.get_capacity(vm) + total_mem += memory_capacity.get_capacity(vm) # capacity requested by hypervisor - total_cores += cap_cores.get_capacity(vm_to_mig) - total_disk += cap_disk.get_capacity(vm_to_mig) - total_mem += cap_mem.get_capacity(vm_to_mig) + total_cores += cpu_capacity.get_capacity(vm_to_mig) + total_disk += disk_capacity.get_capacity(vm_to_mig) + total_mem += memory_capacity.get_capacity(vm_to_mig) - return self.check_threshold(model, + return self.check_threshold(cluster_data_model, dest_hypervisor, total_cores, total_disk, total_mem) - def check_threshold(self, model, + def check_threshold(self, cluster_data_model, dest_hypervisor, total_cores, total_disk, @@ -173,23 +177,23 @@ class BasicConsolidation(base.BaseStrategy): check the threshold value defined by the ratio of aggregated CPU capacity of VMs on one node to CPU capacity of this node must not exceed the threshold value. - :param dest_hypervisor: + :param cluster_data_model: the current state of the cluster + :param dest_hypervisor: the destination of the virtual machine :param total_cores :param total_disk :param total_mem :return: True if the threshold is not exceed """ - cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) - cap_disk = model.get_resource_from_id(resource.ResourceType.disk) - cap_mem = model.get_resource_from_id(resource.ResourceType.memory) - # available - cores_available = cap_cores.get_capacity(dest_hypervisor) - disk_available = cap_disk.get_capacity(dest_hypervisor) - mem_available = cap_mem.get_capacity(dest_hypervisor) + cpu_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor) + disk_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.disk).get_capacity(dest_hypervisor) + memory_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.memory).get_capacity(dest_hypervisor) - if cores_available >= total_cores * self.threshold_cores \ - and disk_available >= total_disk * self.threshold_disk \ - and mem_available >= total_mem * self.threshold_mem: + if (cpu_capacity >= total_cores * self.threshold_cores and + disk_capacity >= total_disk * self.threshold_disk and + memory_capacity >= total_mem * self.threshold_mem): return True else: return False @@ -215,24 +219,25 @@ class BasicConsolidation(base.BaseStrategy): def get_number_of_migrations(self): return self.number_of_migrations - def calculate_weight(self, model, element, total_cores_used, - total_disk_used, total_memory_used): + def calculate_weight(self, cluster_data_model, element, + total_cores_used, total_disk_used, + total_memory_used): """Calculate weight of every resource - :param model: + :param cluster_data_model: :param element: :param total_cores_used: :param total_disk_used: :param total_memory_used: :return: """ - cpu_capacity = model.get_resource_from_id( + cpu_capacity = cluster_data_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(element) - disk_capacity = model.get_resource_from_id( + disk_capacity = cluster_data_model.get_resource_from_id( resource.ResourceType.disk).get_capacity(element) - memory_capacity = model.get_resource_from_id( + memory_capacity = cluster_data_model.get_resource_from_id( resource.ResourceType.memory).get_capacity(element) score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) / @@ -259,25 +264,25 @@ class BasicConsolidation(base.BaseStrategy): :return: """ resource_id = "%s_%s" % (hypervisor.uuid, hypervisor.hostname) - cpu_avg_vm = self.ceilometer. \ + vm_avg_cpu_util = self.ceilometer. \ statistic_aggregation(resource_id=resource_id, meter_name=self.HOST_CPU_USAGE_METRIC_NAME, period="7200", aggregate='avg' ) - if cpu_avg_vm is None: + if vm_avg_cpu_util is None: LOG.error( _LE("No values returned by %(resource_id)s " "for %(metric_name)s"), resource_id=resource_id, metric_name=self.HOST_CPU_USAGE_METRIC_NAME, ) - cpu_avg_vm = 100 + vm_avg_cpu_util = 100 cpu_capacity = model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(hypervisor) - total_cores_used = cpu_capacity * (cpu_avg_vm / 100) + total_cores_used = cpu_capacity * (vm_avg_cpu_util / 100) return self.calculate_weight(model, hypervisor, total_cores_used, 0, @@ -295,14 +300,14 @@ class BasicConsolidation(base.BaseStrategy): else: return 0 - def calculate_score_vm(self, vm, model): + def calculate_score_vm(self, vm, cluster_data_model): """Calculate Score of virtual machine :param vm: the virtual machine - :param model: the model + :param cluster_data_model: the cluster model :return: score """ - if model is None: + if cluster_data_model is None: raise exception.ClusterStateNotDefined() vm_cpu_utilization = self.ceilometer. \ @@ -321,14 +326,13 @@ class BasicConsolidation(base.BaseStrategy): ) vm_cpu_utilization = 100 - cpu_capacity = model.get_resource_from_id( + cpu_capacity = cluster_data_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(vm) total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0) - return self.calculate_weight(model, vm, total_cores_used, - 0, - 0) + return self.calculate_weight(cluster_data_model, vm, + total_cores_used, 0, 0) def add_change_service_state(self, resource_id, state): parameters = {'state': state} @@ -348,14 +352,16 @@ class BasicConsolidation(base.BaseStrategy): resource_id=resource_id, input_parameters=parameters) - def score_of_nodes(self, current_model, score): + def score_of_nodes(self, cluster_data_model, score): """Calculate score of nodes based on load by VMs""" - for hypervisor_id in current_model.get_all_hypervisors(): - hypervisor = current_model.get_hypervisor_from_id(hypervisor_id) - count = current_model.get_mapping(). \ + for hypervisor_id in cluster_data_model.get_all_hypervisors(): + hypervisor = cluster_data_model. \ + get_hypervisor_from_id(hypervisor_id) + count = cluster_data_model.get_mapping(). \ get_node_vms_from_id(hypervisor_id) if len(count) > 0: - result = self.calculate_score_node(hypervisor, current_model) + result = self.calculate_score_node(hypervisor, + cluster_data_model) else: ''' the hypervisor has not VMs ''' result = 0 @@ -363,9 +369,9 @@ class BasicConsolidation(base.BaseStrategy): score.append((hypervisor_id, result)) return score - def node_and_vm_score(self, s, score, current_model): + def node_and_vm_score(self, sorted_score, score, current_model): """Get List of VMs from Node""" - node_to_release = s[len(score) - 1][0] + node_to_release = sorted_score[len(score) - 1][0] vms_to_mig = current_model.get_mapping().get_node_vms_from_id( node_to_release) @@ -395,47 +401,49 @@ class BasicConsolidation(base.BaseStrategy): OFFLINE.value) self.number_of_released_nodes += 1 - def calculate_m(self, v, current_model, node_to_release, s): - m = 0 - for vm in v: - for j in range(0, len(s)): + def calculate_num_migrations(self, sorted_vms, current_model, + node_to_release, sorted_score): + number_migrations = 0 + for vm in sorted_vms: + for j in range(0, len(sorted_score)): mig_vm = current_model.get_vm_from_id(vm[0]) mig_src_hypervisor = current_model.get_hypervisor_from_id( node_to_release) mig_dst_hypervisor = current_model.get_hypervisor_from_id( - s[j][0]) + sorted_score[j][0]) result = self.check_migration(current_model, mig_src_hypervisor, mig_dst_hypervisor, mig_vm) - if result is True: + if result: self.create_migration_vm( current_model, mig_vm, mig_src_hypervisor, mig_dst_hypervisor) - m += 1 + number_migrations += 1 break - return m + return number_migrations - def unsuccessful_migration_actualization(self, m, unsuccessful_migration): - if m > 0: - self.number_of_migrations += m + def unsuccessful_migration_actualization(self, number_migrations, + unsuccessful_migration): + if number_migrations > 0: + self.number_of_migrations += number_migrations return 0 else: return unsuccessful_migration + 1 - def execute(self, orign_model): + def execute(self, original_model): LOG.info(_LI("Initializing Sercon Consolidation")) - if orign_model is None: + if original_model is None: raise exception.ClusterStateNotDefined() # todo(jed) clone model - current_model = orign_model + current_model = original_model self.efficacy = 100 unsuccessful_migration = 0 - first = True + first_migration = True size_cluster = len(current_model.get_all_hypervisors()) if size_cluster == 0: raise exception.ClusterEmpty() @@ -453,18 +461,18 @@ class BasicConsolidation(base.BaseStrategy): OFFLINE.value) while self.get_allowed_migration_attempts() >= unsuccessful_migration: - if first is not True: + if not first_migration: self.efficacy = self.calculate_migration_efficacy() if self.efficacy < float(self.target_efficacy): break - first = False + first_migration = False score = [] score = self.score_of_nodes(current_model, score) ''' sort compute nodes by Score decreasing ''''' - s = sorted(score, reverse=True, key=lambda x: (x[1])) - LOG.debug("Hypervisor(s) BFD {0}".format(s)) + sorted_score = sorted(score, reverse=True, key=lambda x: (x[1])) + LOG.debug("Hypervisor(s) BFD {0}".format(sorted_score)) ''' get Node to be released ''' if len(score) == 0: @@ -474,17 +482,18 @@ class BasicConsolidation(base.BaseStrategy): break node_to_release, vm_score = self.node_and_vm_score( - s, score, current_model) + sorted_score, score, current_model) ''' sort VMs by Score ''' - v = sorted(vm_score, reverse=True, key=lambda x: (x[1])) + sorted_vms = sorted(vm_score, reverse=True, key=lambda x: (x[1])) # BFD: Best Fit Decrease - LOG.debug("VM(s) BFD {0}".format(v)) + LOG.debug("VM(s) BFD {0}".format(sorted_vms)) - m = self.calculate_m(v, current_model, node_to_release, s) + migrations = self.calculate_num_migrations( + sorted_vms, current_model, node_to_release, sorted_score) unsuccessful_migration = self.unsuccessful_migration_actualization( - m, unsuccessful_migration) + migrations, unsuccessful_migration) infos = { "number_of_migrations": self.number_of_migrations, "number_of_nodes_released": self.number_of_released_nodes, diff --git a/watcher/decision_engine/strategy/strategies/dummy_strategy.py b/watcher/decision_engine/strategy/strategies/dummy_strategy.py index 0402acbbc..bbfda76ce 100644 --- a/watcher/decision_engine/strategy/strategies/dummy_strategy.py +++ b/watcher/decision_engine/strategy/strategies/dummy_strategy.py @@ -34,7 +34,7 @@ class DummyStrategy(base.BaseStrategy): osc=None): super(DummyStrategy, self).__init__(name, description, osc) - def execute(self, model): + def execute(self, original_model): LOG.debug("Executing Dummy strategy") parameters = {'message': 'hello World'} self.solution.add_action(action_type=self.NOP, diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index fc914d74c..7973e279e 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -88,25 +88,26 @@ class OutletTempControl(base.BaseStrategy): def ceilometer(self, c): self._ceilometer = c - def calc_used_res(self, model, hypervisor, cap_cores, cap_mem, cap_disk): + def calc_used_res(self, cluster_data_model, hypervisor, cpu_capacity, + memory_capacity, disk_capacity): '''calculate the used vcpus, memory and disk based on VM flavors''' - vms = model.get_mapping().get_node_vms(hypervisor) + vms = cluster_data_model.get_mapping().get_node_vms(hypervisor) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 if len(vms) > 0: for vm_id in vms: - vm = model.get_vm_from_id(vm_id) - vcpus_used += cap_cores.get_capacity(vm) - memory_mb_used += cap_mem.get_capacity(vm) - disk_gb_used += cap_disk.get_capacity(vm) + vm = cluster_data_model.get_vm_from_id(vm_id) + vcpus_used += cpu_capacity.get_capacity(vm) + memory_mb_used += memory_capacity.get_capacity(vm) + disk_gb_used += disk_capacity.get_capacity(vm) return vcpus_used, memory_mb_used, disk_gb_used - def group_hosts_by_outlet_temp(self, model): + def group_hosts_by_outlet_temp(self, cluster_data_model): """Group hosts based on outlet temp meters""" - hypervisors = model.get_all_hypervisors() + hypervisors = cluster_data_model.get_all_hypervisors() size_cluster = len(hypervisors) if size_cluster == 0: raise wexc.ClusterEmpty() @@ -114,7 +115,8 @@ class OutletTempControl(base.BaseStrategy): hosts_need_release = [] hosts_target = [] for hypervisor_id in hypervisors: - hypervisor = model.get_hypervisor_from_id(hypervisor_id) + hypervisor = cluster_data_model.get_hypervisor_from_id( + hypervisor_id) resource_id = hypervisor.uuid outlet_temp = self.ceilometer.statistic_aggregation( @@ -136,17 +138,18 @@ class OutletTempControl(base.BaseStrategy): hosts_target.append(hvmap) return hosts_need_release, hosts_target - def choose_vm_to_migrate(self, model, hosts): + def choose_vm_to_migrate(self, cluster_data_model, hosts): """pick up an active vm instance to migrate from provided hosts""" for hvmap in hosts: mig_src_hypervisor = hvmap['hv'] - vms_of_src = model.get_mapping().get_node_vms(mig_src_hypervisor) + vms_of_src = cluster_data_model.get_mapping().get_node_vms( + mig_src_hypervisor) if len(vms_of_src) > 0: for vm_id in vms_of_src: try: # select the first active VM to migrate - vm = model.get_vm_from_id(vm_id) + vm = cluster_data_model.get_vm_from_id(vm_id) if vm.state != vm_state.VMState.ACTIVE.value: LOG.info(_LE("VM not active, skipped: %s"), vm.uuid) @@ -158,44 +161,45 @@ class OutletTempControl(base.BaseStrategy): return None - def filter_dest_servers(self, model, hosts, vm_to_migrate): + def filter_dest_servers(self, cluster_data_model, hosts, vm_to_migrate): """Only return hosts with sufficient available resources""" - cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores) - cap_disk = model.get_resource_from_id(resource.ResourceType.disk) - cap_mem = model.get_resource_from_id(resource.ResourceType.memory) + cpu_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.cpu_cores) + disk_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.disk) + memory_capacity = cluster_data_model.get_resource_from_id( + resource.ResourceType.memory) - required_cores = cap_cores.get_capacity(vm_to_migrate) - required_disk = cap_disk.get_capacity(vm_to_migrate) - required_mem = cap_mem.get_capacity(vm_to_migrate) + required_cores = cpu_capacity.get_capacity(vm_to_migrate) + required_disk = disk_capacity.get_capacity(vm_to_migrate) + required_memory = memory_capacity.get_capacity(vm_to_migrate) # filter hypervisors without enough resource dest_servers = [] for hvmap in hosts: host = hvmap['hv'] # available - cores_used, mem_used, disk_used = self.calc_used_res(model, - host, - cap_cores, - cap_mem, - cap_disk) - cores_available = cap_cores.get_capacity(host) - cores_used - disk_available = cap_disk.get_capacity(host) - mem_used - mem_available = cap_mem.get_capacity(host) - disk_used + cores_used, mem_used, disk_used = self.calc_used_res( + cluster_data_model, host, cpu_capacity, memory_capacity, + disk_capacity) + cores_available = cpu_capacity.get_capacity(host) - cores_used + disk_available = disk_capacity.get_capacity(host) - mem_used + mem_available = memory_capacity.get_capacity(host) - disk_used if cores_available >= required_cores \ and disk_available >= required_disk \ - and mem_available >= required_mem: + and mem_available >= required_memory: dest_servers.append(hvmap) return dest_servers - def execute(self, orign_model): + def execute(self, original_model): LOG.debug("Initializing Outlet temperature strategy") - if orign_model is None: + if original_model is None: raise wexc.ClusterStateNotDefined() - current_model = orign_model + current_model = original_model hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp( current_model)