Files
watcher-visio/dashboard/prometheus_utils/query.py
Nikolay Tatarinov 876bfa9d2e
Some checks failed
CI / ci (push) Has been cancelled
Update Prometheus query function to include timeout parameter
- Added a timeout parameter to the requests.get call in the query_prometheus function to enhance reliability and prevent hanging requests.
2026-02-07 18:03:55 +03:00

39 lines
1.3 KiB
Python

import requests
from watcher_visio.settings import PROMETHEUS_URL
# Timeout for lightweight health check (seconds)
CHECK_TIMEOUT = 5
def check_prometheus() -> dict:
"""
Lightweight check that Prometheus is reachable.
Returns {"status": "ok"} or {"status": "error", "message": "..."}.
"""
url = f"{PROMETHEUS_URL.rstrip('/')}/api/v1/query"
try:
response = requests.get(url, params={"query": "1"}, timeout=CHECK_TIMEOUT)
response.raise_for_status()
data = response.json()
if "data" in data and "result" in data["data"]:
return {"status": "ok"}
return {"status": "error", "message": "Invalid response"}
except requests.RequestException as e:
return {"status": "error", "message": str(e) or "Connection failed"}
except (ValueError, KeyError) as e:
return {"status": "error", "message": str(e) or "Invalid response"}
def query_prometheus(query: str) -> str | list[str]:
url = f"{PROMETHEUS_URL}/api/v1/query"
params = {
"query": query,
}
response = requests.get(url=url, params=params, timeout=CHECK_TIMEOUT)
response.raise_for_status()
result = response.json()["data"]["result"]
if len(result) > 1:
return result
else:
return result[0]["value"][1]