Some checks failed
CI / ci (push) Failing after 14s
- Standardized string quotes across multiple files to use double quotes for consistency. - Improved formatting of JSON dumps in mock data for better readability. - Enhanced the structure of various functions and data definitions for clarity. - Updated test cases to reflect changes in data structure and ensure accuracy.
39 lines
1.3 KiB
Python
39 lines
1.3 KiB
Python
import requests
|
|
from watcher_visio.settings import PROMETHEUS_URL
|
|
|
|
# Timeout for lightweight health check (seconds)
|
|
CHECK_TIMEOUT = 5
|
|
|
|
|
|
def check_prometheus() -> dict:
|
|
"""
|
|
Lightweight check that Prometheus is reachable.
|
|
Returns {"status": "ok"} or {"status": "error", "message": "..."}.
|
|
"""
|
|
url = f"{PROMETHEUS_URL.rstrip('/')}/api/v1/query"
|
|
try:
|
|
response = requests.get(url, params={"query": "1"}, timeout=CHECK_TIMEOUT)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if "data" in data and "result" in data["data"]:
|
|
return {"status": "ok"}
|
|
return {"status": "error", "message": "Invalid response"}
|
|
except requests.RequestException as e:
|
|
return {"status": "error", "message": str(e) or "Connection failed"}
|
|
except (ValueError, KeyError) as e:
|
|
return {"status": "error", "message": str(e) or "Invalid response"}
|
|
|
|
|
|
def query_prometheus(query: str) -> str | list[str]:
|
|
url = f"{PROMETHEUS_URL}/api/v1/query"
|
|
params = {
|
|
"query": query,
|
|
}
|
|
response = requests.get(url=url, params=params)
|
|
response.raise_for_status()
|
|
result = response.json()["data"]["result"]
|
|
if len(result) > 1:
|
|
return result
|
|
else:
|
|
return result[0]["value"][1]
|