Loadable Cluster Data Model Collectors
In this changeset, I made BaseClusterDataModelCollector instances pluggable. This corresponds to "part 1" of the work items detailed in the specifications. Change-Id: Iab1c7e264add9e2cbbbb767e3fd6e99a0c22c691 Partially-Implements: blueprint cluster-model-objects-wrapper
This commit is contained in:
@@ -13,8 +13,10 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from watcher._i18n import _
|
||||
from watcher.common import exception
|
||||
from watcher.common import utils
|
||||
from watcher.decision_engine.model import hypervisor
|
||||
from watcher.decision_engine.model import mapping
|
||||
from watcher.decision_engine.model import vm
|
||||
@@ -22,10 +24,10 @@ from watcher.decision_engine.model import vm
|
||||
|
||||
class ModelRoot(object):
|
||||
def __init__(self):
|
||||
self._hypervisors = {}
|
||||
self._vms = {}
|
||||
self._hypervisors = utils.Struct()
|
||||
self._vms = utils.Struct()
|
||||
self.mapping = mapping.Mapping(self)
|
||||
self.resource = {}
|
||||
self.resource = utils.Struct()
|
||||
|
||||
def assert_hypervisor(self, obj):
|
||||
if not isinstance(obj, hypervisor.Hypervisor):
|
||||
|
||||
@@ -75,7 +75,7 @@ class BaseStrategy(loadable.Loadable):
|
||||
self._solution = default.DefaultSolution(goal=self.goal, strategy=self)
|
||||
self._osc = osc
|
||||
self._collector_manager = None
|
||||
self._model = None
|
||||
self._compute_model = None
|
||||
self._goal = None
|
||||
self._input_parameters = utils.Struct()
|
||||
|
||||
@@ -159,24 +159,24 @@ class BaseStrategy(loadable.Loadable):
|
||||
return self.solution
|
||||
|
||||
@property
|
||||
def collector(self):
|
||||
def collector_manager(self):
|
||||
if self._collector_manager is None:
|
||||
self._collector_manager = manager.CollectorManager()
|
||||
return self._collector_manager
|
||||
|
||||
@property
|
||||
def model(self):
|
||||
def compute_model(self):
|
||||
"""Cluster data model
|
||||
|
||||
:returns: Cluster data model the strategy is executed on
|
||||
:rtype model: :py:class:`~.ModelRoot` instance
|
||||
"""
|
||||
if self._model is None:
|
||||
collector = self.collector.get_cluster_model_collector(
|
||||
osc=self.osc)
|
||||
self._model = collector.get_latest_cluster_data_model()
|
||||
if self._compute_model is None:
|
||||
collector = self.collector_manager.get_cluster_model_collector(
|
||||
'compute', osc=self.osc)
|
||||
self._compute_model = collector.get_latest_cluster_data_model()
|
||||
|
||||
return self._model
|
||||
return self._compute_model
|
||||
|
||||
@classmethod
|
||||
def get_schema(cls):
|
||||
|
||||
@@ -155,16 +155,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
total_cores = 0
|
||||
total_disk = 0
|
||||
total_mem = 0
|
||||
cpu_capacity = self.model.get_resource_from_id(
|
||||
cpu_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores)
|
||||
disk_capacity = self.model.get_resource_from_id(
|
||||
disk_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk)
|
||||
memory_capacity = self.model.get_resource_from_id(
|
||||
memory_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.memory)
|
||||
|
||||
for vm_id in self.model. \
|
||||
for vm_id in self.compute_model. \
|
||||
get_mapping().get_node_vms(dest_hypervisor):
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
total_cores += cpu_capacity.get_capacity(vm)
|
||||
total_disk += disk_capacity.get_capacity(vm)
|
||||
total_mem += memory_capacity.get_capacity(vm)
|
||||
@@ -191,11 +191,11 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
:param total_mem: total memory used by the virtual machine
|
||||
:return: True if the threshold is not exceed
|
||||
"""
|
||||
cpu_capacity = self.model.get_resource_from_id(
|
||||
cpu_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor)
|
||||
disk_capacity = self.model.get_resource_from_id(
|
||||
disk_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk).get_capacity(dest_hypervisor)
|
||||
memory_capacity = self.model.get_resource_from_id(
|
||||
memory_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.memory).get_capacity(dest_hypervisor)
|
||||
|
||||
return (cpu_capacity >= total_cores * self.threshold_cores and
|
||||
@@ -222,13 +222,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
:param total_memory_used:
|
||||
:return:
|
||||
"""
|
||||
cpu_capacity = self.model.get_resource_from_id(
|
||||
cpu_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores).get_capacity(element)
|
||||
|
||||
disk_capacity = self.model.get_resource_from_id(
|
||||
disk_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk).get_capacity(element)
|
||||
|
||||
memory_capacity = self.model.get_resource_from_id(
|
||||
memory_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.memory).get_capacity(element)
|
||||
|
||||
score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) /
|
||||
@@ -269,7 +269,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
)
|
||||
host_avg_cpu_util = 100
|
||||
|
||||
cpu_capacity = self.model.get_resource_from_id(
|
||||
cpu_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores).get_capacity(hypervisor)
|
||||
|
||||
total_cores_used = cpu_capacity * (host_avg_cpu_util / 100)
|
||||
@@ -292,7 +292,6 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
"""Calculate Score of virtual machine
|
||||
|
||||
:param vm: the virtual machine
|
||||
:param self.model: the cluster model
|
||||
:return: score
|
||||
"""
|
||||
vm_cpu_utilization = self.ceilometer. \
|
||||
@@ -311,7 +310,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
)
|
||||
vm_cpu_utilization = 100
|
||||
|
||||
cpu_capacity = self.model.get_resource_from_id(
|
||||
cpu_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores).get_capacity(vm)
|
||||
|
||||
total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0)
|
||||
@@ -338,10 +337,10 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
|
||||
def score_of_nodes(self, score):
|
||||
"""Calculate score of nodes based on load by VMs"""
|
||||
for hypervisor_id in self.model.get_all_hypervisors():
|
||||
hypervisor = self.model. \
|
||||
for hypervisor_id in self.compute_model.get_all_hypervisors():
|
||||
hypervisor = self.compute_model. \
|
||||
get_hypervisor_from_id(hypervisor_id)
|
||||
count = self.model.get_mapping(). \
|
||||
count = self.compute_model.get_mapping(). \
|
||||
get_node_vms_from_id(hypervisor_id)
|
||||
if len(count) > 0:
|
||||
result = self.calculate_score_node(hypervisor)
|
||||
@@ -355,12 +354,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
def node_and_vm_score(self, sorted_score, score):
|
||||
"""Get List of VMs from node"""
|
||||
node_to_release = sorted_score[len(score) - 1][0]
|
||||
vms_to_mig = self.model.get_mapping().get_node_vms_from_id(
|
||||
vms_to_mig = self.compute_model.get_mapping().get_node_vms_from_id(
|
||||
node_to_release)
|
||||
|
||||
vm_score = []
|
||||
for vm_id in vms_to_mig:
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
if vm.state == vm_state.VMState.ACTIVE.value:
|
||||
vm_score.append(
|
||||
(vm_id, self.calculate_score_vm(vm)))
|
||||
@@ -370,13 +369,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
def create_migration_vm(self, mig_vm, mig_src_hypervisor,
|
||||
mig_dst_hypervisor):
|
||||
"""Create migration VM"""
|
||||
if self.model.get_mapping().migrate_vm(
|
||||
if self.compute_model.get_mapping().migrate_vm(
|
||||
mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
|
||||
self.add_migration(mig_vm.uuid, 'live',
|
||||
mig_src_hypervisor.uuid,
|
||||
mig_dst_hypervisor.uuid)
|
||||
|
||||
if len(self.model.get_mapping().get_node_vms(
|
||||
if len(self.compute_model.get_mapping().get_node_vms(
|
||||
mig_src_hypervisor)) == 0:
|
||||
self.add_change_service_state(mig_src_hypervisor.
|
||||
uuid,
|
||||
@@ -389,10 +388,10 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
number_migrations = 0
|
||||
for vm in sorted_vms:
|
||||
for j in range(0, len(sorted_score)):
|
||||
mig_vm = self.model.get_vm_from_id(vm[0])
|
||||
mig_src_hypervisor = self.model.get_hypervisor_from_id(
|
||||
mig_vm = self.compute_model.get_vm_from_id(vm[0])
|
||||
mig_src_hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
node_to_release)
|
||||
mig_dst_hypervisor = self.model.get_hypervisor_from_id(
|
||||
mig_dst_hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
sorted_score[j][0])
|
||||
|
||||
result = self.check_migration(
|
||||
@@ -414,7 +413,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
|
||||
def pre_execute(self):
|
||||
LOG.info(_LI("Initializing Sercon Consolidation"))
|
||||
if self.model is None:
|
||||
if self.compute_model is None:
|
||||
raise exception.ClusterStateNotDefined()
|
||||
|
||||
def do_execute(self):
|
||||
@@ -423,15 +422,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
unsuccessful_migration = 0
|
||||
|
||||
first_migration = True
|
||||
size_cluster = len(self.model.get_all_hypervisors())
|
||||
size_cluster = len(self.compute_model.get_all_hypervisors())
|
||||
if size_cluster == 0:
|
||||
raise exception.ClusterEmpty()
|
||||
|
||||
self.compute_attempts(size_cluster)
|
||||
|
||||
for hypervisor_id in self.model.get_all_hypervisors():
|
||||
hypervisor = self.model.get_hypervisor_from_id(hypervisor_id)
|
||||
count = self.model.get_mapping(). \
|
||||
for hypervisor_id in self.compute_model.get_all_hypervisors():
|
||||
hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
hypervisor_id)
|
||||
count = self.compute_model.get_mapping(). \
|
||||
get_node_vms_from_id(hypervisor_id)
|
||||
if len(count) == 0:
|
||||
if hypervisor.state == hyper_state.HypervisorState.ENABLED:
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
from oslo_log import log
|
||||
|
||||
from watcher._i18n import _
|
||||
from watcher.common import exception
|
||||
from watcher.decision_engine.strategy.strategies import base
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@@ -50,8 +49,7 @@ class DummyStrategy(base.DummyBaseStrategy):
|
||||
SLEEP = "sleep"
|
||||
|
||||
def pre_execute(self):
|
||||
if self.model is None:
|
||||
raise exception.ClusterStateNotDefined()
|
||||
pass
|
||||
|
||||
def do_execute(self):
|
||||
para1 = self.input_parameters.para1
|
||||
|
||||
@@ -125,13 +125,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
def calc_used_res(self, hypervisor, cpu_capacity,
|
||||
memory_capacity, disk_capacity):
|
||||
"""Calculate the used vcpus, memory and disk based on VM flavors"""
|
||||
vms = self.model.get_mapping().get_node_vms(hypervisor)
|
||||
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
|
||||
vcpus_used = 0
|
||||
memory_mb_used = 0
|
||||
disk_gb_used = 0
|
||||
if len(vms) > 0:
|
||||
for vm_id in vms:
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
vcpus_used += cpu_capacity.get_capacity(vm)
|
||||
memory_mb_used += memory_capacity.get_capacity(vm)
|
||||
disk_gb_used += disk_capacity.get_capacity(vm)
|
||||
@@ -140,7 +140,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
|
||||
def group_hosts_by_outlet_temp(self):
|
||||
"""Group hosts based on outlet temp meters"""
|
||||
hypervisors = self.model.get_all_hypervisors()
|
||||
hypervisors = self.compute_model.get_all_hypervisors()
|
||||
size_cluster = len(hypervisors)
|
||||
if size_cluster == 0:
|
||||
raise wexc.ClusterEmpty()
|
||||
@@ -148,7 +148,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
hosts_need_release = []
|
||||
hosts_target = []
|
||||
for hypervisor_id in hypervisors:
|
||||
hypervisor = self.model.get_hypervisor_from_id(
|
||||
hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
hypervisor_id)
|
||||
resource_id = hypervisor.uuid
|
||||
|
||||
@@ -175,13 +175,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
"""Pick up an active vm instance to migrate from provided hosts"""
|
||||
for hvmap in hosts:
|
||||
mig_src_hypervisor = hvmap['hv']
|
||||
vms_of_src = self.model.get_mapping().get_node_vms(
|
||||
vms_of_src = self.compute_model.get_mapping().get_node_vms(
|
||||
mig_src_hypervisor)
|
||||
if len(vms_of_src) > 0:
|
||||
for vm_id in vms_of_src:
|
||||
try:
|
||||
# select the first active VM to migrate
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
if vm.state != vm_state.VMState.ACTIVE.value:
|
||||
LOG.info(_LI("VM not active, skipped: %s"),
|
||||
vm.uuid)
|
||||
@@ -195,11 +195,11 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
|
||||
def filter_dest_servers(self, hosts, vm_to_migrate):
|
||||
"""Only return hosts with sufficient available resources"""
|
||||
cpu_capacity = self.model.get_resource_from_id(
|
||||
cpu_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores)
|
||||
disk_capacity = self.model.get_resource_from_id(
|
||||
disk_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk)
|
||||
memory_capacity = self.model.get_resource_from_id(
|
||||
memory_capacity = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.memory)
|
||||
|
||||
required_cores = cpu_capacity.get_capacity(vm_to_migrate)
|
||||
@@ -226,7 +226,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
def pre_execute(self):
|
||||
LOG.debug("Initializing Outlet temperature strategy")
|
||||
|
||||
if self.model is None:
|
||||
if self.compute_model is None:
|
||||
raise wexc.ClusterStateNotDefined()
|
||||
|
||||
def do_execute(self):
|
||||
@@ -270,7 +270,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
# always use the host with lowerest outlet temperature
|
||||
mig_dst_hypervisor = dest_servers[0]['hv']
|
||||
# generate solution to migrate the vm to the dest server,
|
||||
if self.model.get_mapping().migrate_vm(
|
||||
if self.compute_model.get_mapping().migrate_vm(
|
||||
vm_src, mig_src_hypervisor, mig_dst_hypervisor):
|
||||
parameters = {'migration_type': 'live',
|
||||
'src_hypervisor': mig_src_hypervisor.uuid,
|
||||
@@ -280,5 +280,5 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
|
||||
input_parameters=parameters)
|
||||
|
||||
def post_execute(self):
|
||||
self.solution.model = self.model
|
||||
self.solution.model = self.compute_model
|
||||
# TODO(v-francoise): Add the indicators to the solution
|
||||
|
||||
@@ -130,12 +130,12 @@ class UniformAirflow(base.BaseStrategy):
|
||||
def calculate_used_resource(self, hypervisor, cap_cores, cap_mem,
|
||||
cap_disk):
|
||||
"""Calculate the used vcpus, memory and disk based on VM flavors"""
|
||||
vms = self.model.get_mapping().get_node_vms(hypervisor)
|
||||
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
|
||||
vcpus_used = 0
|
||||
memory_mb_used = 0
|
||||
disk_gb_used = 0
|
||||
for vm_id in vms:
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
vcpus_used += cap_cores.get_capacity(vm)
|
||||
memory_mb_used += cap_mem.get_capacity(vm)
|
||||
disk_gb_used += cap_disk.get_capacity(vm)
|
||||
@@ -150,7 +150,7 @@ class UniformAirflow(base.BaseStrategy):
|
||||
vms_tobe_migrate = []
|
||||
for hvmap in hosts:
|
||||
source_hypervisor = hvmap['hv']
|
||||
source_vms = self.model.get_mapping().get_node_vms(
|
||||
source_vms = self.compute_model.get_mapping().get_node_vms(
|
||||
source_hypervisor)
|
||||
if source_vms:
|
||||
inlet_t = self.ceilometer.statistic_aggregation(
|
||||
@@ -168,7 +168,7 @@ class UniformAirflow(base.BaseStrategy):
|
||||
# hardware issue, migrate all vms from this hypervisor
|
||||
for vm_id in source_vms:
|
||||
try:
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
vms_tobe_migrate.append(vm)
|
||||
except wexc.InstanceNotFound:
|
||||
LOG.error(_LE("VM not found; error: %s"), vm_id)
|
||||
@@ -177,7 +177,7 @@ class UniformAirflow(base.BaseStrategy):
|
||||
# migrate the first active vm
|
||||
for vm_id in source_vms:
|
||||
try:
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
if vm.state != vm_state.VMState.ACTIVE.value:
|
||||
LOG.info(_LI("VM not active; skipped: %s"),
|
||||
vm.uuid)
|
||||
@@ -193,10 +193,11 @@ class UniformAirflow(base.BaseStrategy):
|
||||
def filter_destination_hosts(self, hosts, vms_to_migrate):
|
||||
"""Return vm and host with sufficient available resources"""
|
||||
|
||||
cap_cores = self.model.get_resource_from_id(
|
||||
cap_cores = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores)
|
||||
cap_disk = self.model.get_resource_from_id(resource.ResourceType.disk)
|
||||
cap_mem = self.model.get_resource_from_id(
|
||||
cap_disk = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk)
|
||||
cap_mem = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.memory)
|
||||
# large vm go first
|
||||
vms_to_migrate = sorted(vms_to_migrate, reverse=True,
|
||||
@@ -240,13 +241,14 @@ class UniformAirflow(base.BaseStrategy):
|
||||
def group_hosts_by_airflow(self):
|
||||
"""Group hosts based on airflow meters"""
|
||||
|
||||
hypervisors = self.model.get_all_hypervisors()
|
||||
hypervisors = self.compute_model.get_all_hypervisors()
|
||||
if not hypervisors:
|
||||
raise wexc.ClusterEmpty()
|
||||
overload_hosts = []
|
||||
nonoverload_hosts = []
|
||||
for hypervisor_id in hypervisors:
|
||||
hypervisor = self.model.get_hypervisor_from_id(hypervisor_id)
|
||||
hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
hypervisor_id)
|
||||
resource_id = hypervisor.uuid
|
||||
airflow = self.ceilometer.statistic_aggregation(
|
||||
resource_id=resource_id,
|
||||
@@ -270,7 +272,7 @@ class UniformAirflow(base.BaseStrategy):
|
||||
def pre_execute(self):
|
||||
LOG.debug("Initializing Uniform Airflow Strategy")
|
||||
|
||||
if self.model is None:
|
||||
if self.compute_model is None:
|
||||
raise wexc.ClusterStateNotDefined()
|
||||
|
||||
def do_execute(self):
|
||||
@@ -310,9 +312,8 @@ class UniformAirflow(base.BaseStrategy):
|
||||
for info in destination_hosts:
|
||||
vm_src = info['vm']
|
||||
mig_dst_hypervisor = info['hv']
|
||||
if self.model.get_mapping().migrate_vm(vm_src,
|
||||
source_hypervisor,
|
||||
mig_dst_hypervisor):
|
||||
if self.compute_model.get_mapping().migrate_vm(
|
||||
vm_src, source_hypervisor, mig_dst_hypervisor):
|
||||
parameters = {'migration_type': 'live',
|
||||
'src_hypervisor': source_hypervisor.uuid,
|
||||
'dst_hypervisor': mig_dst_hypervisor.uuid}
|
||||
@@ -321,5 +322,5 @@ class UniformAirflow(base.BaseStrategy):
|
||||
input_parameters=parameters)
|
||||
|
||||
def post_execute(self):
|
||||
self.solution.model = self.model
|
||||
self.solution.model = self.compute_model
|
||||
# TODO(v-francoise): Add the indicators to the solution
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from copy import deepcopy
|
||||
|
||||
from oslo_log import log
|
||||
import six
|
||||
@@ -209,14 +207,6 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
hyper_state.HypervisorState.DISABLED.value):
|
||||
self.add_action_disable_hypervisor(hypervisor)
|
||||
|
||||
def get_prediction_model(self):
|
||||
"""Return a deepcopy of a model representing current cluster state.
|
||||
|
||||
:param model: model_root object
|
||||
:return: model_root object
|
||||
"""
|
||||
return deepcopy(self.model)
|
||||
|
||||
def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'):
|
||||
"""Collect cpu, ram and disk utilization statistics of a VM.
|
||||
|
||||
@@ -501,7 +491,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
asc += 1
|
||||
|
||||
def pre_execute(self):
|
||||
if self.model is None:
|
||||
if self.compute_model is None:
|
||||
raise exception.ClusterStateNotDefined()
|
||||
|
||||
def do_execute(self):
|
||||
@@ -519,7 +509,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
|
||||
:param original_model: root_model object
|
||||
"""
|
||||
LOG.info(_LI('Executing Smart Strategy'))
|
||||
model = self.get_prediction_model()
|
||||
model = self.compute_model.get_latest_cluster_data_model()
|
||||
rcu = self.get_relative_cluster_utilization(model)
|
||||
self.ceilometer_vm_data_cache = dict()
|
||||
|
||||
|
||||
@@ -108,12 +108,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
def calculate_used_resource(self, hypervisor, cap_cores, cap_mem,
|
||||
cap_disk):
|
||||
"""Calculate the used vcpus, memory and disk based on VM flavors"""
|
||||
vms = self.model.get_mapping().get_node_vms(hypervisor)
|
||||
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
|
||||
vcpus_used = 0
|
||||
memory_mb_used = 0
|
||||
disk_gb_used = 0
|
||||
for vm_id in vms:
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
vcpus_used += cap_cores.get_capacity(vm)
|
||||
memory_mb_used += cap_mem.get_capacity(vm)
|
||||
disk_gb_used += cap_disk.get_capacity(vm)
|
||||
@@ -129,7 +129,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
"""
|
||||
for hvmap in hosts:
|
||||
source_hypervisor = hvmap['hv']
|
||||
source_vms = self.model.get_mapping().get_node_vms(
|
||||
source_vms = self.compute_model.get_mapping().get_node_vms(
|
||||
source_hypervisor)
|
||||
if source_vms:
|
||||
delta_workload = hvmap['workload'] - avg_workload
|
||||
@@ -138,7 +138,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
for vm_id in source_vms:
|
||||
try:
|
||||
# select the first active VM to migrate
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
if vm.state != vm_state.VMState.ACTIVE.value:
|
||||
LOG.debug("VM not active; skipped: %s",
|
||||
vm.uuid)
|
||||
@@ -150,8 +150,8 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
except wexc.InstanceNotFound:
|
||||
LOG.error(_LE("VM not found; error: %s"), vm_id)
|
||||
if instance_id:
|
||||
return source_hypervisor, self.model.get_vm_from_id(
|
||||
instance_id)
|
||||
return (source_hypervisor,
|
||||
self.compute_model.get_vm_from_id(instance_id))
|
||||
else:
|
||||
LOG.info(_LI("VM not found on hypervisor: %s"),
|
||||
source_hypervisor.uuid)
|
||||
@@ -160,10 +160,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
avg_workload, workload_cache):
|
||||
'''Only return hosts with sufficient available resources'''
|
||||
|
||||
cap_cores = self.model.get_resource_from_id(
|
||||
cap_cores = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores)
|
||||
cap_disk = self.model.get_resource_from_id(resource.ResourceType.disk)
|
||||
cap_mem = self.model.get_resource_from_id(resource.ResourceType.memory)
|
||||
cap_disk = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk)
|
||||
cap_mem = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.memory)
|
||||
|
||||
required_cores = cap_cores.get_capacity(vm_to_migrate)
|
||||
required_disk = cap_disk.get_capacity(vm_to_migrate)
|
||||
@@ -201,12 +203,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
and also generate the VM workload map.
|
||||
"""
|
||||
|
||||
hypervisors = self.model.get_all_hypervisors()
|
||||
hypervisors = self.compute_model.get_all_hypervisors()
|
||||
cluster_size = len(hypervisors)
|
||||
if not hypervisors:
|
||||
raise wexc.ClusterEmpty()
|
||||
# get cpu cores capacity of hypervisors and vms
|
||||
cap_cores = self.model.get_resource_from_id(
|
||||
cap_cores = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores)
|
||||
overload_hosts = []
|
||||
nonoverload_hosts = []
|
||||
@@ -216,11 +218,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
# use workload_cache to store the workload of VMs for reuse purpose
|
||||
workload_cache = {}
|
||||
for hypervisor_id in hypervisors:
|
||||
hypervisor = self.model.get_hypervisor_from_id(hypervisor_id)
|
||||
vms = self.model.get_mapping().get_node_vms(hypervisor)
|
||||
hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
hypervisor_id)
|
||||
vms = self.compute_model.get_mapping().get_node_vms(hypervisor)
|
||||
hypervisor_workload = 0.0
|
||||
for vm_id in vms:
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
try:
|
||||
cpu_util = self.ceilometer.statistic_aggregation(
|
||||
resource_id=vm_id,
|
||||
@@ -262,7 +265,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
"""
|
||||
LOG.info(_LI("Initializing Workload Balance Strategy"))
|
||||
|
||||
if self.model is None:
|
||||
if self.compute_model is None:
|
||||
raise wexc.ClusterStateNotDefined()
|
||||
|
||||
def do_execute(self):
|
||||
@@ -308,9 +311,9 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
key=lambda x: (x["cpu_util"]))
|
||||
# always use the host with lowerest CPU utilization
|
||||
mig_dst_hypervisor = destination_hosts[0]['hv']
|
||||
# generate solution to migrate the vm to the dest server
|
||||
if self.model.get_mapping().migrate_vm(vm_src, source_hypervisor,
|
||||
mig_dst_hypervisor):
|
||||
# generate solution to migrate the vm to the dest server,
|
||||
if self.compute_model.get_mapping().migrate_vm(
|
||||
vm_src, source_hypervisor, mig_dst_hypervisor):
|
||||
parameters = {'migration_type': 'live',
|
||||
'src_hypervisor': source_hypervisor.uuid,
|
||||
'dst_hypervisor': mig_dst_hypervisor.uuid}
|
||||
@@ -323,4 +326,4 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
|
||||
|
||||
This can be used to compute the global efficacy
|
||||
"""
|
||||
self.solution.model = self.model
|
||||
self.solution.model = self.compute_model
|
||||
|
||||
@@ -171,9 +171,9 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
:return: dict
|
||||
"""
|
||||
LOG.debug('get_vm_load started')
|
||||
vm_vcpus = self.model.get_resource_from_id(
|
||||
vm_vcpus = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores).get_capacity(
|
||||
self.model.get_vm_from_id(vm_uuid))
|
||||
self.compute_model.get_vm_from_id(vm_uuid))
|
||||
vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus}
|
||||
for meter in self.metrics:
|
||||
avg_meter = self.ceilometer.statistic_aggregation(
|
||||
@@ -192,9 +192,9 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
normalized_hosts = deepcopy(hosts)
|
||||
for host in normalized_hosts:
|
||||
if 'memory.resident' in normalized_hosts[host]:
|
||||
h_memory = self.model.get_resource_from_id(
|
||||
h_memory = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.memory).get_capacity(
|
||||
self.model.get_hypervisor_from_id(host))
|
||||
self.compute_model.get_hypervisor_from_id(host))
|
||||
normalized_hosts[host]['memory.resident'] /= float(h_memory)
|
||||
|
||||
return normalized_hosts
|
||||
@@ -202,11 +202,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
def get_hosts_load(self):
|
||||
"""Get load of every host by gathering vms load"""
|
||||
hosts_load = {}
|
||||
for hypervisor_id in self.model.get_all_hypervisors():
|
||||
for hypervisor_id in self.compute_model.get_all_hypervisors():
|
||||
hosts_load[hypervisor_id] = {}
|
||||
host_vcpus = self.model.get_resource_from_id(
|
||||
host_vcpus = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.cpu_cores).get_capacity(
|
||||
self.model.get_hypervisor_from_id(hypervisor_id))
|
||||
self.compute_model.get_hypervisor_from_id(hypervisor_id))
|
||||
hosts_load[hypervisor_id]['vcpus'] = host_vcpus
|
||||
|
||||
for metric in self.metrics:
|
||||
@@ -297,15 +297,15 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
yield hypervisors
|
||||
|
||||
vm_host_map = []
|
||||
for source_hp_id in self.model.get_all_hypervisors():
|
||||
hypervisors = list(self.model.get_all_hypervisors())
|
||||
for source_hp_id in self.compute_model.get_all_hypervisors():
|
||||
hypervisors = list(self.compute_model.get_all_hypervisors())
|
||||
hypervisors.remove(source_hp_id)
|
||||
hypervisor_list = yield_hypervisors(hypervisors)
|
||||
vms_id = self.model.get_mapping(). \
|
||||
vms_id = self.compute_model.get_mapping(). \
|
||||
get_node_vms_from_id(source_hp_id)
|
||||
for vm_id in vms_id:
|
||||
min_sd_case = {'value': len(self.metrics)}
|
||||
vm = self.model.get_vm_from_id(vm_id)
|
||||
vm = self.compute_model.get_vm_from_id(vm_id)
|
||||
if vm.state not in [vm_state.VMState.ACTIVE.value,
|
||||
vm_state.VMState.PAUSED.value]:
|
||||
continue
|
||||
@@ -347,27 +347,29 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
def create_migration_vm(self, mig_vm, mig_src_hypervisor,
|
||||
mig_dst_hypervisor):
|
||||
"""Create migration VM """
|
||||
if self.model.get_mapping().migrate_vm(
|
||||
if self.compute_model.get_mapping().migrate_vm(
|
||||
mig_vm, mig_src_hypervisor, mig_dst_hypervisor):
|
||||
self.add_migration(mig_vm.uuid, 'live',
|
||||
mig_src_hypervisor.uuid,
|
||||
mig_dst_hypervisor.uuid)
|
||||
|
||||
def migrate(self, vm_uuid, src_host, dst_host):
|
||||
mig_vm = self.model.get_vm_from_id(vm_uuid)
|
||||
mig_src_hypervisor = self.model.get_hypervisor_from_id(src_host)
|
||||
mig_dst_hypervisor = self.model.get_hypervisor_from_id(dst_host)
|
||||
mig_vm = self.compute_model.get_vm_from_id(vm_uuid)
|
||||
mig_src_hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
src_host)
|
||||
mig_dst_hypervisor = self.compute_model.get_hypervisor_from_id(
|
||||
dst_host)
|
||||
self.create_migration_vm(mig_vm, mig_src_hypervisor,
|
||||
mig_dst_hypervisor)
|
||||
|
||||
def fill_solution(self):
|
||||
self.solution.model = self.model
|
||||
self.solution.model = self.compute_model
|
||||
return self.solution
|
||||
|
||||
def pre_execute(self):
|
||||
LOG.info(_LI("Initializing Workload Stabilization"))
|
||||
|
||||
if self.model is None:
|
||||
if self.compute_model is None:
|
||||
raise exception.ClusterStateNotDefined()
|
||||
|
||||
def do_execute(self):
|
||||
@@ -377,12 +379,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
|
||||
min_sd = 1
|
||||
balanced = False
|
||||
for vm_host in migration:
|
||||
dst_hp_disk = self.model.get_resource_from_id(
|
||||
dst_hp_disk = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk).get_capacity(
|
||||
self.model.get_hypervisor_from_id(vm_host['host']))
|
||||
vm_disk = self.model.get_resource_from_id(
|
||||
self.compute_model.get_hypervisor_from_id(
|
||||
vm_host['host']))
|
||||
vm_disk = self.compute_model.get_resource_from_id(
|
||||
resource.ResourceType.disk).get_capacity(
|
||||
self.model.get_vm_from_id(vm_host['vm']))
|
||||
self.compute_model.get_vm_from_id(vm_host['vm']))
|
||||
if vm_disk > dst_hp_disk:
|
||||
continue
|
||||
vm_load = self.calculate_migration_case(hosts_load,
|
||||
|
||||
Reference in New Issue
Block a user