Files
duty-teller/duty_teller/api/dependencies.py
Nikolay Tatarinov 86f6d66865
All checks were successful
CI / lint-and-test (push) Successful in 17s
chore: add changelog and documentation updates
- Created a new `CHANGELOG.md` file to document all notable changes to the project, adhering to the Keep a Changelog format.
- Updated `CONTRIBUTING.md` to include instructions for building and previewing documentation using MkDocs.
- Added `mkdocs.yml` configuration for documentation generation, including navigation structure and theme settings.
- Enhanced various documentation files, including API reference, architecture overview, configuration reference, and runbook, to provide comprehensive guidance for users and developers.
- Included new sections in the README for changelog and documentation links, improving accessibility to project information.
2026-02-20 15:32:10 +03:00

209 lines
7.3 KiB
Python

"""FastAPI dependencies: DB session, Miniapp auth (initData/allowlist), date validation."""
import logging
import re
from typing import Annotated, Generator
from fastapi import Depends, Header, HTTPException, Query, Request
from sqlalchemy.orm import Session
import duty_teller.config as config
from duty_teller.api.telegram_auth import validate_init_data_with_reason
from duty_teller.db.repository import get_duties, get_user_by_telegram_id
from duty_teller.db.schemas import DutyWithUser
from duty_teller.db.session import session_scope
from duty_teller.i18n import t
from duty_teller.utils.dates import validate_date_range
log = logging.getLogger(__name__)
# First language tag from Accept-Language (e.g. "ru-RU,ru;q=0.9,en;q=0.8" -> "ru")
_ACCEPT_LANG_TAG_RE = re.compile(r"^([a-zA-Z]{2,3})(?:-[a-zA-Z0-9]+)?\s*(?:;|,|$)")
def _lang_from_accept_language(header: str | None) -> str:
"""Normalize Accept-Language header to 'ru' or 'en'; fallback to config.DEFAULT_LANGUAGE.
Args:
header: Raw Accept-Language header value (e.g. "ru-RU,ru;q=0.9,en;q=0.8").
Returns:
'ru' or 'en'.
"""
if not header or not header.strip():
return config.DEFAULT_LANGUAGE
first = header.strip().split(",")[0].strip()
m = _ACCEPT_LANG_TAG_RE.match(first)
if not m:
return "en"
code = m.group(1).lower()
return "ru" if code.startswith("ru") else "en"
def _auth_error_detail(auth_reason: str, lang: str) -> str:
"""Return translated auth error message."""
if auth_reason == "hash_mismatch":
return t(lang, "api.auth_bad_signature")
return t(lang, "api.auth_invalid")
def _validate_duty_dates(from_date: str, to_date: str, lang: str) -> None:
"""Validate date range; raise HTTPException with translated detail."""
try:
validate_date_range(from_date, to_date)
except ValueError as e:
msg = str(e)
if "YYYY-MM-DD" in msg or "формате" in msg:
detail = t(lang, "dates.bad_format")
else:
detail = t(lang, "dates.from_after_to")
raise HTTPException(status_code=400, detail=detail) from e
def get_validated_dates(
request: Request,
from_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="from"),
to_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="to"),
) -> tuple[str, str]:
"""Validate from/to date query params; use Accept-Language for error messages.
Args:
request: FastAPI request (for Accept-Language).
from_date: Start date YYYY-MM-DD.
to_date: End date YYYY-MM-DD.
Returns:
(from_date, to_date) as strings.
Raises:
HTTPException: 400 if format invalid or from_date > to_date.
"""
lang = _lang_from_accept_language(request.headers.get("Accept-Language"))
_validate_duty_dates(from_date, to_date, lang)
return (from_date, to_date)
def get_db_session() -> Generator[Session, None, None]:
"""Yield a DB session for the request; closed automatically by FastAPI."""
with session_scope(config.DATABASE_URL) as session:
yield session
def require_miniapp_username(
request: Request,
x_telegram_init_data: Annotated[
str | None, Header(alias="X-Telegram-Init-Data")
] = None,
session: Session = Depends(get_db_session),
) -> str:
"""FastAPI dependency: require valid Miniapp auth; return username/identifier.
Raises:
HTTPException: 403 if initData missing/invalid or user not in allowlist.
"""
return get_authenticated_username(request, x_telegram_init_data, session)
def _is_private_client(client_host: str | None) -> bool:
if not client_host:
return False
if client_host in ("127.0.0.1", "::1"):
return True
parts = client_host.split(".")
if len(parts) == 4:
try:
a, b, c, d = (int(x) for x in parts)
if (a == 10) or (a == 172 and 16 <= b <= 31) or (a == 192 and b == 168):
return True
except (ValueError, IndexError):
pass
return False
def get_authenticated_username(
request: Request,
x_telegram_init_data: str | None,
session: Session,
) -> str:
"""Return identifier for miniapp auth (username or full_name or id:...); empty if skip-auth.
Args:
request: FastAPI request (client host for private-IP bypass).
x_telegram_init_data: Raw X-Telegram-Init-Data header value.
session: DB session (for phone allowlist lookup).
Returns:
Username, full_name, or "id:<telegram_id>"; empty string if MINI_APP_SKIP_AUTH
or private IP and no initData.
Raises:
HTTPException: 403 if initData missing/invalid or user not in allowlist.
"""
if config.MINI_APP_SKIP_AUTH:
log.warning("allowing without any auth check (MINI_APP_SKIP_AUTH is set)")
return ""
init_data = (x_telegram_init_data or "").strip()
if not init_data:
client_host = request.client.host if request.client else None
if _is_private_client(client_host):
return ""
log.warning("no X-Telegram-Init-Data header (client=%s)", client_host)
lang = _lang_from_accept_language(request.headers.get("Accept-Language"))
raise HTTPException(status_code=403, detail=t(lang, "api.open_from_telegram"))
max_age = config.INIT_DATA_MAX_AGE_SECONDS or None
telegram_user_id, username, auth_reason, lang = validate_init_data_with_reason(
init_data, config.BOT_TOKEN, max_age_seconds=max_age
)
if auth_reason != "ok":
log.warning("initData validation failed: %s", auth_reason)
raise HTTPException(
status_code=403, detail=_auth_error_detail(auth_reason, lang)
)
if username and config.can_access_miniapp(username):
return username
failed_phone: str | None = None
if telegram_user_id is not None:
user = get_user_by_telegram_id(session, telegram_user_id)
if user and user.phone and config.can_access_miniapp_by_phone(user.phone):
return username or (user.full_name or "") or f"id:{telegram_user_id}"
if user and user.phone:
failed_phone = config.normalize_phone(user.phone)
log.warning(
"username/phone not in allowlist (username=%s, telegram_id=%s, phone=%s)",
username,
telegram_user_id,
failed_phone if failed_phone else "",
)
raise HTTPException(status_code=403, detail=t(lang, "api.access_denied"))
def fetch_duties_response(
session: Session, from_date: str, to_date: str
) -> list[DutyWithUser]:
"""Load duties in range and return as DutyWithUser list for API response.
Args:
session: DB session.
from_date: Start date YYYY-MM-DD.
to_date: End date YYYY-MM-DD.
Returns:
List of DutyWithUser (id, user_id, start_at, end_at, full_name, event_type).
"""
rows = get_duties(session, from_date=from_date, to_date=to_date)
return [
DutyWithUser(
id=duty.id,
user_id=duty.user_id,
start_at=duty.start_at,
end_at=duty.end_at,
full_name=full_name,
event_type=(
duty.event_type
if duty.event_type in ("duty", "unavailable", "vacation")
else "duty"
),
)
for duty, full_name in rows
]