import pandas from copy import copy from openstack.connection import Connection from watcher_visio.settings import WATCHER_ENDPOINT_NAME, WATCHER_INTERFACE_NAME, PROMETHEUS_METRICS from dashboard.prometheus_utils.query import query_prometheus def convert_cpu_data(data: list): metrics = [] if not data: return pandas.DataFrame(columns=["host", "cpu_usage"]) for entry in data: for t, val in entry["values"]: metrics.append({ "timestamp": int(t), "host": entry["metric"]["host"], "cpu_usage": float(val), "instance": entry["metric"]["instanceName"] }) df_cpu = pandas.DataFrame(metrics) df_cpu["timestamp"] = pandas.to_datetime(df_cpu["timestamp"], unit="s") # Aggregate CPU usage per host return ( df_cpu.groupby(["host", "timestamp"])["cpu_usage"].sum() .groupby("host").mean() .reset_index() ) def get_audits(connection: Connection) -> list[dict] | None: session = connection.session watcher_endpoint = connection.endpoint_for( service_type=WATCHER_ENDPOINT_NAME, interface=WATCHER_INTERFACE_NAME ) # Collect instances prometheus metrics cpu_data = query_prometheus(PROMETHEUS_METRICS['cpu_usage']) cpu_metrics = convert_cpu_data(data=cpu_data) # Fetch audit list audits_resp = session.get( f"{watcher_endpoint}/v1/audits" ) audits_resp.raise_for_status() audits_resp = audits_resp.json().get('audits') or [] # Fetch action plan list actionplans_resp = session.get( f"{watcher_endpoint}/v1/action_plans" ) actionplans_resp.raise_for_status() actionplans_resp = actionplans_resp.json().get('action_plans') or [] # Filtering audits by PENDING state pending_audits = [plan for plan in actionplans_resp if plan['state'] == "RECOMMENDED"] result = [] for item in pending_audits: projected_cpu_data = copy(cpu_data) audit_resp = session.get( f"{watcher_endpoint}/v1/audits/{item['audit_uuid']}" ) audit_resp.raise_for_status() audit_resp = audit_resp.json() actionplan = next(filter(lambda x: x.get("audit_uuid") == audit_resp['uuid'], actionplans_resp), None) if actionplan is None: continue actions_resp = session.get( f"{watcher_endpoint}/v1/actions/?action_plan_uuid={actionplan['uuid']}" ) actions_resp.raise_for_status() actions_resp = actions_resp.json().get('actions') or [] migrations = [] mapping = {} for action in actions_resp: action_resp = session.get( f"{watcher_endpoint}/v1/actions/{action['uuid']}" ) action_resp.raise_for_status() action_resp = action_resp.json() server = connection.get_server_by_id(action_resp['input_parameters']['resource_id']) params = action_resp['input_parameters'] mapping[params['resource_name']] = params['destination_node'] migrations.append({ "instanceName": action_resp['input_parameters']['resource_name'], "source": action_resp['input_parameters']['source_node'], "destination": action_resp['input_parameters']['destination_node'], "flavor": server.flavor.name, "impact": 'Low' }) for entry in projected_cpu_data: if (instance := entry['metric']['instanceName']) in mapping: entry['metric']['host'] = mapping[instance] projected_cpu_metrics = convert_cpu_data(projected_cpu_data) result.append({ "id": audit_resp['uuid'], "name": audit_resp['name'], "created_at": audit_resp['created_at'], "strategy": audit_resp['strategy_name'], "goal": audit_resp['goal_name'], "type": audit_resp['audit_type'], "scope": audit_resp['scope'], "cpu_weight": audit_resp['parameters'].get('weights', {}).get('instance_cpu_usage_weight', "none"), "ram_weight": audit_resp['parameters'].get('weights', {}).get('instance_ram_usage_weight', "none"), "migrations": migrations, "host_labels": cpu_metrics['host'].to_list(), "cpu_current": cpu_metrics['cpu_usage'].to_list(), "cpu_projected": projected_cpu_metrics['cpu_usage'].to_list(), }) return result