import requests from watcher_visio.settings import PROMETHEUS_URL # Timeout for lightweight health check (seconds) CHECK_TIMEOUT = 5 def check_prometheus() -> dict: """ Lightweight check that Prometheus is reachable. Returns {"status": "ok"} or {"status": "error", "message": "..."}. """ url = f"{PROMETHEUS_URL.rstrip('/')}/api/v1/query" try: response = requests.get(url, params={"query": "1"}, timeout=CHECK_TIMEOUT) response.raise_for_status() data = response.json() if "data" in data and "result" in data["data"]: return {"status": "ok"} return {"status": "error", "message": "Invalid response"} except requests.RequestException as e: return {"status": "error", "message": str(e) or "Connection failed"} except (ValueError, KeyError) as e: return {"status": "error", "message": str(e) or "Invalid response"} def query_prometheus(query: str) -> str | list[str]: url = f"{PROMETHEUS_URL}/api/v1/query" params = { "query": query, } response = requests.get(url=url, params=params, timeout=CHECK_TIMEOUT) response.raise_for_status() result = response.json()["data"]["result"] if len(result) > 1: return result else: return result[0]["value"][1]