Files
duty-teller/duty_teller/api/dependencies.py
Nikolay Tatarinov 0ecbda67f9
All checks were successful
CI / lint-and-test (push) Successful in 19s
chore: add coverage reporting and improve documentation
- Added `pytest-cov` as a development dependency for coverage reporting.
- Configured pytest to include coverage options, ensuring code coverage is reported and enforced.
- Updated the README to include contributing guidelines and logging policies, enhancing clarity for developers.
- Added a new section in the configuration documentation emphasizing the necessity of serving the application over HTTPS in production for security purposes.
- Introduced a new `.coverage` file to track test coverage metrics.
2026-02-20 16:18:59 +03:00

221 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FastAPI dependencies: DB session, Miniapp auth (initData/allowlist), date validation."""
import logging
import re
from typing import Annotated, Generator
from fastapi import Depends, Header, HTTPException, Query, Request
from sqlalchemy.orm import Session
import duty_teller.config as config
from duty_teller.api.telegram_auth import validate_init_data_with_reason
from duty_teller.db.repository import get_duties, get_user_by_telegram_id
from duty_teller.db.schemas import DutyWithUser
from duty_teller.db.session import session_scope
from duty_teller.i18n import t
from duty_teller.utils.dates import validate_date_range
log = logging.getLogger(__name__)
# First language tag from Accept-Language (e.g. "ru-RU,ru;q=0.9,en;q=0.8" -> "ru")
_ACCEPT_LANG_TAG_RE = re.compile(r"^([a-zA-Z]{2,3})(?:-[a-zA-Z0-9]+)?\s*(?:;|,|$)")
def _lang_from_accept_language(header: str | None) -> str:
"""Normalize Accept-Language header to 'ru' or 'en'; fallback to config.DEFAULT_LANGUAGE.
Args:
header: Raw Accept-Language header value (e.g. "ru-RU,ru;q=0.9,en;q=0.8").
Returns:
'ru' or 'en'.
"""
if not header or not header.strip():
return config.DEFAULT_LANGUAGE
first = header.strip().split(",")[0].strip()
m = _ACCEPT_LANG_TAG_RE.match(first)
if not m:
return "en"
code = m.group(1).lower()
return "ru" if code.startswith("ru") else "en"
def _auth_error_detail(auth_reason: str, lang: str) -> str:
"""Return translated auth error message."""
if auth_reason == "hash_mismatch":
return t(lang, "api.auth_bad_signature")
return t(lang, "api.auth_invalid")
def _validate_duty_dates(from_date: str, to_date: str, lang: str) -> None:
"""Validate date range; raise HTTPException with translated detail."""
try:
validate_date_range(from_date, to_date)
except ValueError as e:
msg = str(e)
if "YYYY-MM-DD" in msg or "формате" in msg:
detail = t(lang, "dates.bad_format")
else:
detail = t(lang, "dates.from_after_to")
raise HTTPException(status_code=400, detail=detail) from e
def get_validated_dates(
request: Request,
from_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="from"),
to_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="to"),
) -> tuple[str, str]:
"""Validate from/to date query params; use Accept-Language for error messages.
Args:
request: FastAPI request (for Accept-Language).
from_date: Start date YYYY-MM-DD.
to_date: End date YYYY-MM-DD.
Returns:
(from_date, to_date) as strings.
Raises:
HTTPException: 400 if format invalid or from_date > to_date.
"""
lang = _lang_from_accept_language(request.headers.get("Accept-Language"))
_validate_duty_dates(from_date, to_date, lang)
return (from_date, to_date)
def get_db_session() -> Generator[Session, None, None]:
"""Yield a DB session for the request; closed automatically by FastAPI."""
with session_scope(config.DATABASE_URL) as session:
yield session
def require_miniapp_username(
request: Request,
x_telegram_init_data: Annotated[
str | None, Header(alias="X-Telegram-Init-Data")
] = None,
session: Session = Depends(get_db_session),
) -> str:
"""FastAPI dependency: require valid Miniapp auth; return username/identifier.
Raises:
HTTPException: 403 if initData missing/invalid or user not in allowlist.
"""
return get_authenticated_username(request, x_telegram_init_data, session)
def _is_private_client(client_host: str | None) -> bool:
"""Return True if client_host is localhost or RFC 1918 private IPv4.
Used to allow /api/duties without initData when opened from local/private
network (e.g. dev). IPv4 only; IPv6 only 127/::1 checked.
Args:
client_host: Client IP or hostname from request.
Returns:
True if loopback or 10.x, 172.1631.x, 192.168.x.x.
"""
if not client_host:
return False
if client_host in ("127.0.0.1", "::1"):
return True
# RFC 1918 private ranges: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
parts = client_host.split(".")
if len(parts) == 4:
try:
a, b, c, d = (int(x) for x in parts)
if (a == 10) or (a == 172 and 16 <= b <= 31) or (a == 192 and b == 168):
return True
except (ValueError, IndexError):
pass
return False
def get_authenticated_username(
request: Request,
x_telegram_init_data: str | None,
session: Session,
) -> str:
"""Return identifier for miniapp auth (username or full_name or id:...); empty if skip-auth.
Args:
request: FastAPI request (client host for private-IP bypass).
x_telegram_init_data: Raw X-Telegram-Init-Data header value.
session: DB session (for phone allowlist lookup).
Returns:
Username, full_name, or "id:<telegram_id>"; empty string if MINI_APP_SKIP_AUTH
or private IP and no initData.
Raises:
HTTPException: 403 if initData missing/invalid or user not in allowlist.
"""
if config.MINI_APP_SKIP_AUTH:
log.warning("allowing without any auth check (MINI_APP_SKIP_AUTH is set)")
return ""
init_data = (x_telegram_init_data or "").strip()
if not init_data:
client_host = request.client.host if request.client else None
if _is_private_client(client_host):
return ""
log.warning("no X-Telegram-Init-Data header (client=%s)", client_host)
lang = _lang_from_accept_language(request.headers.get("Accept-Language"))
raise HTTPException(status_code=403, detail=t(lang, "api.open_from_telegram"))
max_age = config.INIT_DATA_MAX_AGE_SECONDS or None
telegram_user_id, username, auth_reason, lang = validate_init_data_with_reason(
init_data, config.BOT_TOKEN, max_age_seconds=max_age
)
if auth_reason != "ok":
log.warning("initData validation failed: %s", auth_reason)
raise HTTPException(
status_code=403, detail=_auth_error_detail(auth_reason, lang)
)
if username and config.can_access_miniapp(username):
return username
failed_phone: str | None = None
if telegram_user_id is not None:
user = get_user_by_telegram_id(session, telegram_user_id)
if user and user.phone and config.can_access_miniapp_by_phone(user.phone):
return username or (user.full_name or "") or f"id:{telegram_user_id}"
if user and user.phone:
failed_phone = config.normalize_phone(user.phone)
log.warning(
"username/phone not in allowlist (username=%s, telegram_id=%s, phone=%s)",
username,
telegram_user_id,
failed_phone if failed_phone else "",
)
raise HTTPException(status_code=403, detail=t(lang, "api.access_denied"))
def fetch_duties_response(
session: Session, from_date: str, to_date: str
) -> list[DutyWithUser]:
"""Load duties in range and return as DutyWithUser list for API response.
Args:
session: DB session.
from_date: Start date YYYY-MM-DD.
to_date: End date YYYY-MM-DD.
Returns:
List of DutyWithUser (id, user_id, start_at, end_at, full_name, event_type).
"""
rows = get_duties(session, from_date=from_date, to_date=to_date)
return [
DutyWithUser(
id=duty.id,
user_id=duty.user_id,
start_at=duty.start_at,
end_at=duty.end_at,
full_name=full_name,
event_type=(
duty.event_type
if duty.event_type in ("duty", "unavailable", "vacation")
else "duty"
),
)
for duty, full_name in rows
]