from copy import copy import pandas from openstack.connection import Connection from watcher_visio.settings import PROMETHEUS_METRICS, WATCHER_ENDPOINT_NAME, WATCHER_INTERFACE_NAME from dashboard.prometheus_utils.query import query_prometheus def convert_cpu_data(data: list): metrics = [] if not data: return pandas.DataFrame(columns=["host", "cpu_usage"]) for entry in data: for t, val in entry["values"]: metrics.append( { "timestamp": int(t), "host": entry["metric"]["host"], "cpu_usage": float(val), "instance": entry["metric"]["instanceName"], } ) df_cpu = pandas.DataFrame(metrics) df_cpu["timestamp"] = pandas.to_datetime(df_cpu["timestamp"], unit="s") # Aggregate CPU usage per host return ( df_cpu.groupby(["host", "timestamp"])["cpu_usage"] .sum() .groupby("host") .mean() .reset_index() ) def get_current_cluster_cpu(connection: Connection) -> dict: """Return current per-host CPU state for the cluster (no Watcher dependency).""" cpu_data = query_prometheus(PROMETHEUS_METRICS["cpu_usage"]) cpu_metrics = convert_cpu_data(data=cpu_data) if cpu_metrics.empty: return {"host_labels": [], "cpu_current": []} return { "host_labels": cpu_metrics["host"].to_list(), "cpu_current": cpu_metrics["cpu_usage"].to_list(), } def _fetch_audits_and_action_plans(session, watcher_endpoint): """GET audits and action_plans from Watcher API. Returns (audits_list, action_plans_list).""" audits_resp = session.get(f"{watcher_endpoint}/v1/audits") audits_resp.raise_for_status() audits_list = audits_resp.json().get("audits") or [] actionplans_resp = session.get(f"{watcher_endpoint}/v1/action_plans") actionplans_resp.raise_for_status() action_plans_list = actionplans_resp.json().get("action_plans") or [] return audits_list, action_plans_list def _fetch_migrations_for_audit( connection, session, watcher_endpoint, audit_resp, actionplan, actions_resp ): """ Fetch action details for the given action plan and build migrations list and instance->destination mapping. Returns (migrations, mapping). """ migrations = [] mapping = {} for action in actions_resp: action_resp = session.get(f"{watcher_endpoint}/v1/actions/{action['uuid']}") action_resp.raise_for_status() action_resp = action_resp.json() server = connection.get_server_by_id(action_resp["input_parameters"]["resource_id"]) params = action_resp["input_parameters"] mapping[params["resource_name"]] = params["destination_node"] migrations.append( { "instanceName": params["resource_name"], "source": params["source_node"], "destination": params["destination_node"], "flavor": server.flavor.name, "impact": "Low", } ) return migrations, mapping def _build_projected_cpu_metrics(cpu_data, mapping): """ Apply instance->destination mapping to a copy of cpu_data and return aggregated CPU metrics DataFrame (host, cpu_usage). """ projected_cpu_data = copy(cpu_data) for entry in projected_cpu_data: if (instance := entry["metric"]["instanceName"]) in mapping: entry["metric"]["host"] = mapping[instance] return convert_cpu_data(projected_cpu_data) def get_audits(connection: Connection) -> list[dict] | None: session = connection.session watcher_endpoint = connection.endpoint_for( service_type=WATCHER_ENDPOINT_NAME, interface=WATCHER_INTERFACE_NAME ) cpu_data = query_prometheus(PROMETHEUS_METRICS["cpu_usage"]) cpu_metrics = convert_cpu_data(data=cpu_data) _, action_plans_list = _fetch_audits_and_action_plans(session, watcher_endpoint) pending_audits = [plan for plan in action_plans_list if plan["state"] == "RECOMMENDED"] result = [] for item in pending_audits: audit_resp = session.get(f"{watcher_endpoint}/v1/audits/{item['audit_uuid']}") audit_resp.raise_for_status() audit_resp = audit_resp.json() actionplan = next( filter(lambda x: x.get("audit_uuid") == audit_resp["uuid"], action_plans_list), None ) if actionplan is None: continue actions_resp = session.get( f"{watcher_endpoint}/v1/actions/?action_plan_uuid={actionplan['uuid']}" ) actions_resp.raise_for_status() actions_resp = actions_resp.json().get("actions") or [] migrations, mapping = _fetch_migrations_for_audit( connection, session, watcher_endpoint, audit_resp, actionplan, actions_resp ) projected_cpu_metrics = _build_projected_cpu_metrics(cpu_data, mapping) result.append( { "id": audit_resp["uuid"], "name": audit_resp["name"], "created_at": audit_resp["created_at"], "strategy": audit_resp["strategy_name"], "goal": audit_resp["goal_name"], "type": audit_resp["audit_type"], "scope": audit_resp["scope"], "cpu_weight": audit_resp["parameters"] .get("weights", {}) .get("instance_cpu_usage_weight", "none"), "ram_weight": audit_resp["parameters"] .get("weights", {}) .get("instance_ram_usage_weight", "none"), "migrations": migrations, "host_labels": cpu_metrics["host"].to_list(), "cpu_current": cpu_metrics["cpu_usage"].to_list(), "cpu_projected": projected_cpu_metrics["cpu_usage"].to_list(), } ) return result