Enhance Telegram bot functionality and improve error handling
- Introduced a new function to set the default menu button for the Telegram bot's Web App. - Updated the initData validation process to provide detailed error messages for authorization failures. - Refactored the validate_init_data function to return both username and reason for validation failure. - Enhanced the web application to handle access denial more gracefully, providing users with hints on how to access the calendar. - Improved the README with additional instructions for configuring the bot's menu button and Web App URL. - Updated tests to reflect changes in the validation process and error handling.
This commit is contained in:
22
api/app.py
22
api/app.py
@@ -12,7 +12,7 @@ from fastapi.staticfiles import StaticFiles
|
||||
from db.session import session_scope
|
||||
from db.repository import get_duties
|
||||
from db.schemas import DutyWithUser
|
||||
from api.telegram_auth import validate_init_data
|
||||
from api.telegram_auth import validate_init_data_with_reason
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -50,6 +50,16 @@ def _fetch_duties_response(from_date: str, to_date: str) -> list[DutyWithUser]:
|
||||
]
|
||||
|
||||
|
||||
def _auth_error_detail(auth_reason: str) -> str:
|
||||
"""Return user-facing detail message for 403 when initData validation fails."""
|
||||
if auth_reason == "hash_mismatch":
|
||||
return (
|
||||
"Неверная подпись. Убедитесь, что BOT_TOKEN на сервере совпадает с токеном бота, "
|
||||
"из которого открыт календарь (тот же бот, что в меню)."
|
||||
)
|
||||
return "Неверные данные авторизации"
|
||||
|
||||
|
||||
def _is_private_client(client_host: str | None) -> bool:
|
||||
"""True if client is localhost or private LAN (dev / same-machine access).
|
||||
|
||||
@@ -110,12 +120,12 @@ def list_duties(
|
||||
log.warning("duties: no X-Telegram-Init-Data header (client=%s)", client_host)
|
||||
raise HTTPException(status_code=403, detail="Откройте календарь из Telegram")
|
||||
max_age = config.INIT_DATA_MAX_AGE_SECONDS or None
|
||||
username = validate_init_data(init_data, config.BOT_TOKEN, max_age_seconds=max_age)
|
||||
username, auth_reason = validate_init_data_with_reason(
|
||||
init_data, config.BOT_TOKEN, max_age_seconds=max_age
|
||||
)
|
||||
if username is None:
|
||||
log.warning(
|
||||
"duties: initData validation failed (invalid signature or no username)"
|
||||
)
|
||||
raise HTTPException(status_code=403, detail="Неверные данные авторизации")
|
||||
log.warning("duties: initData validation failed: %s", auth_reason)
|
||||
raise HTTPException(status_code=403, detail=_auth_error_detail(auth_reason))
|
||||
if not config.can_access_miniapp(username):
|
||||
log.warning("duties: username not in allowlist")
|
||||
raise HTTPException(status_code=403, detail="Доступ запрещён")
|
||||
|
||||
@@ -15,17 +15,23 @@ def validate_init_data(
|
||||
bot_token: str,
|
||||
max_age_seconds: int | None = None,
|
||||
) -> str | None:
|
||||
"""
|
||||
Validate initData signature and return the Telegram username (lowercase, no @).
|
||||
Returns None if data is invalid, forged, or user has no username.
|
||||
"""Validate initData and return username; see validate_init_data_with_reason for failure reason."""
|
||||
username, _ = validate_init_data_with_reason(init_data, bot_token, max_age_seconds)
|
||||
return username
|
||||
|
||||
If max_age_seconds is set, initData must include auth_date and it must be no older
|
||||
than max_age_seconds (replay protection). Example: 86400 = 24 hours.
|
||||
|
||||
def validate_init_data_with_reason(
|
||||
init_data: str,
|
||||
bot_token: str,
|
||||
max_age_seconds: int | None = None,
|
||||
) -> tuple[str | None, str]:
|
||||
"""
|
||||
Validate initData signature and return (username, None) or (None, reason).
|
||||
reason is one of: "ok", "empty", "no_hash", "hash_mismatch", "auth_date_expired", "no_user", "user_invalid", "no_username".
|
||||
"""
|
||||
if not init_data or not bot_token:
|
||||
return None
|
||||
return (None, "empty")
|
||||
init_data = init_data.strip()
|
||||
# Parse preserving raw values for HMAC (key -> raw value)
|
||||
params = {}
|
||||
for part in init_data.split("&"):
|
||||
if "=" not in part:
|
||||
@@ -36,46 +42,41 @@ def validate_init_data(
|
||||
params[key] = value
|
||||
hash_val = params.pop("hash", None)
|
||||
if not hash_val:
|
||||
return None
|
||||
# Build data-check string: keys sorted, format key=value per line (raw values)
|
||||
return (None, "no_hash")
|
||||
data_pairs = sorted(params.items())
|
||||
data_string = "\n".join(f"{k}={v}" for k, v in data_pairs)
|
||||
# secret_key = HMAC-SHA256(key=b"WebAppData", msg=bot_token.encode()).digest()
|
||||
secret_key = hmac.new(
|
||||
b"WebAppData",
|
||||
msg=bot_token.encode(),
|
||||
digestmod=hashlib.sha256,
|
||||
).digest()
|
||||
# computed = HMAC-SHA256(key=secret_key, msg=data_string.encode()).hexdigest()
|
||||
computed = hmac.new(
|
||||
secret_key,
|
||||
msg=data_string.encode(),
|
||||
digestmod=hashlib.sha256,
|
||||
).hexdigest()
|
||||
if not hmac.compare_digest(computed.lower(), hash_val.lower()):
|
||||
return None
|
||||
# Optional replay protection: reject initData older than max_age_seconds
|
||||
return (None, "hash_mismatch")
|
||||
if max_age_seconds is not None and max_age_seconds > 0:
|
||||
auth_date_raw = params.get("auth_date")
|
||||
if not auth_date_raw:
|
||||
return None
|
||||
return (None, "auth_date_expired")
|
||||
try:
|
||||
auth_date = int(float(auth_date_raw))
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
return (None, "auth_date_expired")
|
||||
if time.time() - auth_date > max_age_seconds:
|
||||
return None
|
||||
# Parse user JSON (value may be URL-encoded in the raw string)
|
||||
return (None, "auth_date_expired")
|
||||
user_raw = params.get("user")
|
||||
if not user_raw:
|
||||
return None
|
||||
return (None, "no_user")
|
||||
try:
|
||||
user = json.loads(unquote(user_raw))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return None
|
||||
return (None, "user_invalid")
|
||||
if not isinstance(user, dict):
|
||||
return None
|
||||
return (None, "user_invalid")
|
||||
username = user.get("username")
|
||||
if not username or not isinstance(username, str):
|
||||
return None
|
||||
return username.strip().lstrip("@").lower()
|
||||
return (None, "no_username")
|
||||
return (username.strip().lstrip("@").lower(), "ok")
|
||||
|
||||
@@ -53,22 +53,23 @@ def test_duties_200_when_skip_auth(mock_fetch, client):
|
||||
mock_fetch.assert_called_once_with("2025-01-01", "2025-01-31")
|
||||
|
||||
|
||||
@patch("api.app.validate_init_data")
|
||||
@patch("api.app.validate_init_data_with_reason")
|
||||
def test_duties_403_when_init_data_invalid(mock_validate, client):
|
||||
mock_validate.return_value = None
|
||||
mock_validate.return_value = (None, "hash_mismatch")
|
||||
r = client.get(
|
||||
"/api/duties",
|
||||
params={"from": "2025-01-01", "to": "2025-01-31"},
|
||||
headers={"X-Telegram-Init-Data": "some=data&hash=abc"},
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert "авторизации" in r.json()["detail"] or "Неверные" in r.json()["detail"]
|
||||
detail = r.json()["detail"]
|
||||
assert "авторизации" in detail or "Неверные" in detail or "Неверная" in detail
|
||||
|
||||
|
||||
@patch("api.app.validate_init_data")
|
||||
@patch("api.app.validate_init_data_with_reason")
|
||||
@patch("api.app.config.can_access_miniapp")
|
||||
def test_duties_403_when_username_not_allowed(mock_can_access, mock_validate, client):
|
||||
mock_validate.return_value = "someuser"
|
||||
mock_validate.return_value = ("someuser", "ok")
|
||||
mock_can_access.return_value = False
|
||||
with patch("api.app._fetch_duties_response") as mock_fetch:
|
||||
r = client.get(
|
||||
@@ -81,10 +82,10 @@ def test_duties_403_when_username_not_allowed(mock_can_access, mock_validate, cl
|
||||
mock_fetch.assert_not_called()
|
||||
|
||||
|
||||
@patch("api.app.validate_init_data")
|
||||
@patch("api.app.validate_init_data_with_reason")
|
||||
@patch("api.app.config.can_access_miniapp")
|
||||
def test_duties_200_with_allowed_user(mock_can_access, mock_validate, client):
|
||||
mock_validate.return_value = "alloweduser"
|
||||
mock_validate.return_value = ("alloweduser", "ok")
|
||||
mock_can_access.return_value = True
|
||||
with patch("api.app._fetch_duties_response") as mock_fetch:
|
||||
mock_fetch.return_value = [
|
||||
@@ -108,7 +109,7 @@ def test_duties_200_with_allowed_user(mock_can_access, mock_validate, client):
|
||||
|
||||
|
||||
def test_duties_e2e_auth_real_validation(client, monkeypatch):
|
||||
"""E2E: valid initData + allowlist, no mocks on validate_init_data; full auth path."""
|
||||
"""E2E: valid initData + allowlist, no mocks on validate_init_data_with_reason; full auth path."""
|
||||
test_token = "123:ABC"
|
||||
test_username = "e2euser"
|
||||
monkeypatch.setattr(config, "BOT_TOKEN", test_token)
|
||||
|
||||
Reference in New Issue
Block a user