Files
duty-teller/duty_teller/api/app.py
Nikolay Tatarinov 87e8417675 refactor: improve code readability and structure in various components
- Refactored the `mini_app_short_name` assignment in `config.py` for better clarity.
- Enhanced the `app_config_js` function in `app.py` to improve formatting of the JavaScript response body.
- Added per-chat locks in `group_duty_pin.py` to prevent concurrent refreshes, improving message handling.
- Updated `_schedule_next_update` to include optional jitter for scheduling, enhancing performance during high-load scenarios.
- Cleaned up test files by removing unused imports and improving test descriptions for clarity.
2026-03-03 17:52:23 +03:00

289 lines
10 KiB
Python

"""FastAPI app: /api/duties, /api/calendar-events, personal ICS, and static webapp at /app."""
import logging
import re
from datetime import date, timedelta
import duty_teller.config as config
from fastapi import Depends, FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from sqlalchemy.orm import Session
from duty_teller.api.calendar_ics import get_calendar_events
from duty_teller.api.dependencies import (
fetch_duties_response,
get_db_session,
get_validated_dates,
require_miniapp_username,
)
from duty_teller.api.personal_calendar_ics import build_personal_ics, build_team_ics
from duty_teller.cache import ics_calendar_cache
from duty_teller.db.repository import (
get_duties,
get_duties_for_user,
get_user_by_calendar_token,
)
from duty_teller.db.schemas import CalendarEvent, DutyWithUser
log = logging.getLogger(__name__)
# Calendar tokens are secrets.token_urlsafe(32) → base64url, length 43
_CALENDAR_TOKEN_RE = re.compile(r"^[A-Za-z0-9_-]{40,50}$")
def _is_valid_calendar_token(token: str) -> bool:
"""Return True if token matches expected format (length and alphabet). Rejects invalid before DB."""
return bool(token and _CALENDAR_TOKEN_RE.match(token))
app = FastAPI(title="Duty Teller API")
@app.exception_handler(Exception)
def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""Log unhandled exceptions and return 500 without exposing details to the client."""
log.exception("Unhandled exception: %s", exc)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"},
)
@app.get("/health", summary="Health check")
def health() -> dict:
"""Return 200 when the app is up. Used by Docker HEALTHCHECK."""
return {"status": "ok"}
app.add_middleware(
CORSMiddleware,
allow_origins=config.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class NoCacheStaticMiddleware:
"""
Raw ASGI middleware: Cache-Control: no-store for all /app and /app/* static files;
Vary: Accept-Language on all responses so reverse proxies do not serve one user's response to another.
"""
def __init__(self, app, **kwargs):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
path = scope.get("path", "")
is_app_path = path == "/app" or path.startswith("/app/")
async def send_wrapper(message):
if message["type"] == "http.response.start":
headers = list(message.get("headers", []))
header_names = {h[0].lower(): i for i, h in enumerate(headers)}
if is_app_path:
cache_control = (b"cache-control", b"no-store")
if b"cache-control" in header_names:
headers[header_names[b"cache-control"]] = cache_control
else:
headers.append(cache_control)
vary_val = b"Accept-Language"
if b"vary" in header_names:
idx = header_names[b"vary"]
existing = headers[idx][1]
tokens = [p.strip() for p in existing.split(b",")]
if vary_val not in tokens:
headers[idx] = (b"vary", existing + b", " + vary_val)
else:
headers.append((b"vary", vary_val))
message = {
"type": "http.response.start",
"status": message["status"],
"headers": headers,
}
await send(message)
await self.app(scope, receive, send_wrapper)
app.add_middleware(NoCacheStaticMiddleware)
# Allowed values for config.js to prevent script injection.
_VALID_LANGS = frozenset({"en", "ru"})
_VALID_LOG_LEVELS = frozenset({"debug", "info", "warning", "error"})
def _safe_js_string(value: str, allowed: frozenset[str], default: str) -> str:
"""Return value if it is in allowed set, else default. Prevents injection in config.js."""
if value in allowed:
return value
return default
# Timezone for duty display: allow only safe chars (letters, digits, /, _, -, +) to prevent injection.
_TZ_SAFE_RE = re.compile(r"^[A-Za-z0-9_/+-]{1,50}$")
def _safe_tz_string(value: str) -> str:
"""Return value if it matches safe timezone pattern, else empty string."""
if value and _TZ_SAFE_RE.match(value.strip()):
return value.strip()
return ""
@app.get(
"/app/config.js",
summary="Mini App config (language, log level, timezone)",
description=(
"Returns JS that sets window.__DT_LANG, window.__DT_LOG_LEVEL and window.__DT_TZ. "
"Loaded before main.js."
),
)
def app_config_js() -> Response:
"""Return JS assigning window.__DT_LANG, __DT_LOG_LEVEL and __DT_TZ for the webapp. No caching."""
lang = _safe_js_string(config.DEFAULT_LANGUAGE, _VALID_LANGS, "en")
log_level = _safe_js_string(config.LOG_LEVEL_STR.lower(), _VALID_LOG_LEVELS, "info")
tz = _safe_tz_string(config.DUTY_DISPLAY_TZ)
tz_js = f'\nwindow.__DT_TZ = "{tz}";' if tz else "\nwindow.__DT_TZ = undefined;"
body = (
f'window.__DT_LANG = "{lang}";\nwindow.__DT_LOG_LEVEL = "{log_level}";{tz_js}'
)
return Response(
content=body,
media_type="application/javascript; charset=utf-8",
headers={"Cache-Control": "no-store"},
)
@app.get(
"/api/duties",
response_model=list[DutyWithUser],
summary="List duties",
description=(
"Returns duties for the given date range. Requires Telegram Mini App initData "
"(or MINI_APP_SKIP_AUTH / private IP in dev)."
),
)
def list_duties(
request: Request,
dates: tuple[str, str] = Depends(get_validated_dates),
_username: str = Depends(require_miniapp_username),
session: Session = Depends(get_db_session),
):
from_date_val, to_date_val = dates
log.info(
"GET /api/duties from %s",
request.client.host if request.client else "?",
)
return fetch_duties_response(session, from_date_val, to_date_val)
@app.get(
"/api/calendar-events",
response_model=list[CalendarEvent],
summary="List calendar events",
description=(
"Returns calendar events for the date range, including external ICS when "
"EXTERNAL_CALENDAR_ICS_URL is set. Auth same as /api/duties."
),
)
def list_calendar_events(
dates: tuple[str, str] = Depends(get_validated_dates),
_username: str = Depends(require_miniapp_username),
):
from_date_val, to_date_val = dates
url = config.EXTERNAL_CALENDAR_ICS_URL
if not url:
return []
events = get_calendar_events(url, from_date=from_date_val, to_date=to_date_val)
return [CalendarEvent(date=e["date"], summary=e["summary"]) for e in events]
@app.get(
"/api/calendar/ical/team/{token}.ics",
summary="Team calendar ICS",
description=(
"Returns an ICS calendar with all team duty shifts. Each event has "
"DESCRIPTION set to the duty holder's name. No Telegram auth; access by token."
),
)
def get_team_calendar_ical(
token: str,
session: Session = Depends(get_db_session),
) -> Response:
"""Return ICS calendar with all duties (event_type duty only). Token validates user."""
if not _is_valid_calendar_token(token):
return JSONResponse(status_code=404, content={"detail": "Not found"})
user = get_user_by_calendar_token(session, token)
if user is None:
return JSONResponse(status_code=404, content={"detail": "Not found"})
cache_key = ("team_ics",)
ics_bytes, found = ics_calendar_cache.get(cache_key)
if not found:
today = date.today()
from_date = (today - timedelta(days=365)).strftime("%Y-%m-%d")
to_date = (today + timedelta(days=365 * 2)).strftime("%Y-%m-%d")
all_duties = get_duties(session, from_date=from_date, to_date=to_date)
duties_duty_only = [
(d, name)
for d, name, *_ in all_duties
if (d.event_type or "duty") == "duty"
]
ics_bytes = build_team_ics(duties_duty_only)
ics_calendar_cache.set(cache_key, ics_bytes)
return Response(
content=ics_bytes,
media_type="text/calendar; charset=utf-8",
)
@app.get(
"/api/calendar/ical/{token}.ics",
summary="Personal calendar ICS",
description=(
"Returns an ICS calendar with the subscribing user's duty shifts only. "
"No Telegram auth; access is by secret token in the URL."
),
)
def get_personal_calendar_ical(
token: str,
session: Session = Depends(get_db_session),
) -> Response:
"""
Return ICS calendar with the subscribing user's duty shifts only.
No Telegram auth; access is by secret token in the URL.
"""
if not _is_valid_calendar_token(token):
return JSONResponse(status_code=404, content={"detail": "Not found"})
user = get_user_by_calendar_token(session, token)
if user is None:
return JSONResponse(status_code=404, content={"detail": "Not found"})
cache_key = ("personal_ics", user.id)
ics_bytes, found = ics_calendar_cache.get(cache_key)
if not found:
today = date.today()
from_date = (today - timedelta(days=365)).strftime("%Y-%m-%d")
to_date = (today + timedelta(days=365 * 2)).strftime("%Y-%m-%d")
duties_with_name = get_duties_for_user(
session, user.id, from_date=from_date, to_date=to_date, event_types=["duty"]
)
ics_bytes = build_personal_ics(duties_with_name)
ics_calendar_cache.set(cache_key, ics_bytes)
return Response(
content=ics_bytes,
media_type="text/calendar; charset=utf-8",
)
webapp_path = config.PROJECT_ROOT / "webapp-next" / "out"
if webapp_path.is_dir():
app.mount("/app", StaticFiles(directory=str(webapp_path), html=True), name="webapp")