Update configuration and access control for Telegram miniapp
- Added ALLOWED_USERNAMES and ADMIN_USERNAMES to .env.example for user access control. - Implemented validation of Telegram Web App initData in a new telegram_auth.py module. - Enhanced API to check user access before fetching duties. - Updated README with instructions for configuring miniapp access. - Modified .dockerignore and .gitignore to include data directory and database files.
This commit is contained in:
12
api/app.py
12
api/app.py
@@ -2,13 +2,14 @@
|
||||
from pathlib import Path
|
||||
|
||||
import config
|
||||
from fastapi import FastAPI, Query
|
||||
from fastapi import FastAPI, Header, HTTPException, Query
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
||||
from db.session import get_session
|
||||
from db.repository import get_duties
|
||||
from db.schemas import DutyWithUser
|
||||
from api.telegram_auth import validate_init_data
|
||||
|
||||
app = FastAPI(title="Duty Teller API")
|
||||
app.add_middleware(
|
||||
@@ -24,7 +25,16 @@ app.add_middleware(
|
||||
def list_duties(
|
||||
from_date: str = Query(..., description="ISO date YYYY-MM-DD"),
|
||||
to_date: str = Query(..., description="ISO date YYYY-MM-DD"),
|
||||
x_telegram_init_data: str | None = Header(None, alias="X-Telegram-Init-Data"),
|
||||
) -> list[DutyWithUser]:
|
||||
init_data = (x_telegram_init_data or "").strip()
|
||||
if not init_data:
|
||||
raise HTTPException(status_code=403, detail="Откройте календарь из Telegram")
|
||||
username = validate_init_data(init_data, config.BOT_TOKEN)
|
||||
if username is None:
|
||||
raise HTTPException(status_code=403, detail="Неверные данные авторизации")
|
||||
if not config.can_access_miniapp(username):
|
||||
raise HTTPException(status_code=403, detail="Доступ запрещён")
|
||||
session = get_session(config.DATABASE_URL)
|
||||
try:
|
||||
rows = get_duties(session, from_date=from_date, to_date=to_date)
|
||||
|
||||
61
api/telegram_auth.py
Normal file
61
api/telegram_auth.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Validate Telegram Web App initData and extract user username."""
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
from urllib.parse import unquote
|
||||
|
||||
# Telegram algorithm: https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app
|
||||
# Data-check string must use the same key=value pairs as received (sorted by key); we preserve raw values.
|
||||
|
||||
|
||||
def validate_init_data(init_data: str, bot_token: str) -> str | None:
|
||||
"""
|
||||
Validate initData signature and return the Telegram username (lowercase, no @).
|
||||
Returns None if data is invalid, forged, or user has no username.
|
||||
"""
|
||||
if not init_data or not bot_token:
|
||||
return None
|
||||
init_data = init_data.strip()
|
||||
# Parse preserving raw values for HMAC (key -> raw value)
|
||||
params = {}
|
||||
for part in init_data.split("&"):
|
||||
if "=" not in part:
|
||||
continue
|
||||
key, _, value = part.partition("=")
|
||||
if not key:
|
||||
continue
|
||||
params[key] = value
|
||||
hash_val = params.pop("hash", None)
|
||||
if not hash_val:
|
||||
return None
|
||||
# Build data-check string: keys sorted, format key=value per line (raw values)
|
||||
data_pairs = sorted(params.items())
|
||||
data_string = "\n".join(f"{k}={v}" for k, v in data_pairs)
|
||||
# secret_key = HMAC-SHA256(key=b"WebAppData", msg=bot_token.encode()).digest()
|
||||
secret_key = hmac.new(
|
||||
b"WebAppData",
|
||||
msg=bot_token.encode(),
|
||||
digestmod=hashlib.sha256,
|
||||
).digest()
|
||||
# computed = HMAC-SHA256(key=secret_key, msg=data_string.encode()).hexdigest()
|
||||
computed = hmac.new(
|
||||
secret_key,
|
||||
msg=data_string.encode(),
|
||||
digestmod=hashlib.sha256,
|
||||
).hexdigest()
|
||||
if not hmac.compare_digest(computed.lower(), hash_val.lower()):
|
||||
return None
|
||||
# Parse user JSON (value may be URL-encoded in the raw string)
|
||||
user_raw = params.get("user")
|
||||
if not user_raw:
|
||||
return None
|
||||
try:
|
||||
user = json.loads(unquote(user_raw))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return None
|
||||
if not isinstance(user, dict):
|
||||
return None
|
||||
username = user.get("username")
|
||||
if not username or not isinstance(username, str):
|
||||
return None
|
||||
return username.strip().lstrip("@").lower()
|
||||
Reference in New Issue
Block a user