- Improved formatting and readability in config.py and other files by adding line breaks. - Introduced INIT_DATA_MAX_AGE_SECONDS to enforce replay protection for Telegram initData. - Updated validate_init_data function to include max_age_seconds parameter for validation. - Enhanced API to reject old initData based on the new max_age_seconds setting. - Added tests for auth_date expiry and validation of initData in test_telegram_auth.py. - Updated README with details on the new INIT_DATA_MAX_AGE_SECONDS configuration.
128 lines
4.9 KiB
Python
128 lines
4.9 KiB
Python
"""FastAPI app: /api/duties and static webapp."""
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import config
|
|
from fastapi import FastAPI, Header, HTTPException, Query, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
from db.session import session_scope
|
|
from db.repository import get_duties
|
|
from db.schemas import DutyWithUser
|
|
from api.telegram_auth import validate_init_data
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ISO date YYYY-MM-DD
|
|
_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
|
|
|
|
|
|
def _validate_duty_dates(from_date: str, to_date: str) -> None:
|
|
"""Raise HTTPException 400 if dates are invalid or from_date > to_date."""
|
|
if not _DATE_RE.match(from_date) or not _DATE_RE.match(to_date):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Параметры from и to должны быть в формате YYYY-MM-DD",
|
|
)
|
|
if from_date > to_date:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Дата from не должна быть позже to",
|
|
)
|
|
|
|
|
|
def _fetch_duties_response(from_date: str, to_date: str) -> list[DutyWithUser]:
|
|
"""Fetch duties in range and return list of DutyWithUser. Uses config.DATABASE_URL."""
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
rows = get_duties(session, from_date=from_date, to_date=to_date)
|
|
return [
|
|
DutyWithUser(
|
|
id=duty.id,
|
|
user_id=duty.user_id,
|
|
start_at=duty.start_at,
|
|
end_at=duty.end_at,
|
|
full_name=full_name,
|
|
)
|
|
for duty, full_name in rows
|
|
]
|
|
|
|
|
|
def _is_private_client(client_host: str | None) -> bool:
|
|
"""True if client is localhost or private LAN (dev / same-machine access).
|
|
|
|
Note: Behind a reverse proxy (e.g. nginx, Caddy), request.client.host is often
|
|
the proxy address (e.g. 127.0.0.1). Then "private client" would be true for all
|
|
requests when initData is missing. For production, either rely on the Mini App
|
|
always sending initData, or configure the proxy to forward the real client IP
|
|
(e.g. X-Forwarded-For) and use that for this check. Do not rely on the private-IP
|
|
bypass when deployed behind a proxy without one of these measures.
|
|
"""
|
|
if not client_host:
|
|
return False
|
|
if client_host in ("127.0.0.1", "::1"):
|
|
return True
|
|
parts = client_host.split(".")
|
|
if len(parts) == 4: # IPv4
|
|
try:
|
|
a, b, c, d = (int(x) for x in parts)
|
|
if (a == 10) or (a == 172 and 16 <= b <= 31) or (a == 192 and b == 168):
|
|
return True
|
|
except (ValueError, IndexError):
|
|
pass
|
|
return False
|
|
|
|
|
|
app = FastAPI(title="Duty Teller API")
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=config.CORS_ORIGINS,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/api/duties", response_model=list[DutyWithUser])
|
|
def list_duties(
|
|
request: Request,
|
|
from_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="from"),
|
|
to_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="to"),
|
|
x_telegram_init_data: str | None = Header(None, alias="X-Telegram-Init-Data"),
|
|
) -> list[DutyWithUser]:
|
|
_validate_duty_dates(from_date, to_date)
|
|
log.info(
|
|
"GET /api/duties from %s, has initData: %s",
|
|
request.client.host if request.client else "?",
|
|
bool((x_telegram_init_data or "").strip()),
|
|
)
|
|
init_data = (x_telegram_init_data or "").strip()
|
|
if not init_data:
|
|
client_host = request.client.host if request.client else None
|
|
if _is_private_client(client_host) or config.MINI_APP_SKIP_AUTH:
|
|
if config.MINI_APP_SKIP_AUTH:
|
|
log.warning(
|
|
"duties: allowing without initData (MINI_APP_SKIP_AUTH is set)"
|
|
)
|
|
return _fetch_duties_response(from_date, to_date)
|
|
log.warning("duties: no X-Telegram-Init-Data header (client=%s)", client_host)
|
|
raise HTTPException(status_code=403, detail="Откройте календарь из Telegram")
|
|
max_age = config.INIT_DATA_MAX_AGE_SECONDS or None
|
|
username = validate_init_data(init_data, config.BOT_TOKEN, max_age_seconds=max_age)
|
|
if username is None:
|
|
log.warning(
|
|
"duties: initData validation failed (invalid signature or no username)"
|
|
)
|
|
raise HTTPException(status_code=403, detail="Неверные данные авторизации")
|
|
if not config.can_access_miniapp(username):
|
|
log.warning("duties: username not in allowlist")
|
|
raise HTTPException(status_code=403, detail="Доступ запрещён")
|
|
return _fetch_duties_response(from_date, to_date)
|
|
|
|
|
|
webapp_path = Path(__file__).resolve().parent.parent / "webapp"
|
|
if webapp_path.is_dir():
|
|
app.mount("/app", StaticFiles(directory=str(webapp_path), html=True), name="webapp")
|