from concurrent.futures import ThreadPoolExecutor, as_completed import requests from watcher_visio.settings import PROMETHEUS_URL # Timeout for lightweight health check (seconds) CHECK_TIMEOUT = 5 # Dashboard Prometheus queries (query_key -> query string), run in parallel DASHBOARD_QUERIES = { "hosts_total": "count(node_exporter_build_info{job='node_exporter_compute'})", "pcpu_total": ( "sum(count(node_cpu_seconds_total{job='node_exporter_compute', mode='idle'}) " "without (cpu,mode))" ), "pcpu_usage": "sum(node_load5{job='node_exporter_compute'})", "vcpu_allocated": "sum(libvirt_domain_info_virtual_cpus)", "vcpu_overcommit_max": ( "avg(openstack_placement_resource_allocation_ratio{resourcetype='VCPU'})" ), "pram_total": "sum(node_memory_MemTotal_bytes{job='node_exporter_compute'})", "pram_usage": "sum(node_memory_Active_bytes{job='node_exporter_compute'})", "vram_allocated": "sum(libvirt_domain_info_maximum_memory_bytes)", "vram_overcommit_max": ( "avg(avg_over_time(" "openstack_placement_resource_allocation_ratio{resourcetype='MEMORY_MB'}[5m]))" ), "vm_count": "sum(libvirt_domain_state_code)", "vm_active": "sum(libvirt_domain_state_code{stateDesc='the domain is running'})", } # Keys that should be parsed as float (rest as int) DASHBOARD_FLOAT_KEYS = frozenset(("pcpu_usage", "vcpu_overcommit_max", "vram_overcommit_max")) def check_prometheus() -> dict: """ Lightweight check that Prometheus is reachable. Returns {"status": "ok"} or {"status": "error", "message": "..."}. """ url = f"{PROMETHEUS_URL.rstrip('/')}/api/v1/query" try: response = requests.get(url, params={"query": "1"}, timeout=CHECK_TIMEOUT) response.raise_for_status() data = response.json() if "data" in data and "result" in data["data"]: return {"status": "ok"} return {"status": "error", "message": "Invalid response"} except requests.RequestException as e: return {"status": "error", "message": str(e) or "Connection failed"} except (ValueError, KeyError) as e: return {"status": "error", "message": str(e) or "Invalid response"} def query_prometheus(query: str) -> str | list[str]: url = f"{PROMETHEUS_URL}/api/v1/query" params = { "query": query, } response = requests.get(url=url, params=params, timeout=CHECK_TIMEOUT) response.raise_for_status() result = response.json()["data"]["result"] if len(result) > 1: return result else: return result[0]["value"][1] def fetch_dashboard_metrics() -> dict: """Run all dashboard Prometheus queries in parallel and return a dict of name -> value.""" result = {} with ThreadPoolExecutor(max_workers=len(DASHBOARD_QUERIES)) as executor: future_to_key = { executor.submit(query_prometheus, query=q): key for key, q in DASHBOARD_QUERIES.items() } for future in as_completed(future_to_key): key = future_to_key[future] try: raw = future.result() if key in DASHBOARD_FLOAT_KEYS: result[key] = float(raw) else: result[key] = int(raw) except (ValueError, TypeError): result[key] = float(0) if key in DASHBOARD_FLOAT_KEYS else 0 return result