- Added functionality to normalize phone numbers for comparison, ensuring only digits are stored and checked. - Updated configuration to include optional phone number allowlists for users and admins in the environment settings. - Enhanced authentication logic to allow access based on normalized phone numbers, in addition to usernames. - Introduced new helper functions for parsing and validating phone numbers, improving code organization and maintainability. - Added unit tests to validate phone normalization and access control based on phone numbers.
108 lines
4.0 KiB
Python
108 lines
4.0 KiB
Python
"""Validate Telegram Web App initData and extract user username."""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from urllib.parse import unquote
|
|
|
|
# Telegram algorithm: https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app
|
|
# Data-check string: sorted key=value with URL-decoded values, then HMAC-SHA256(WebAppData, token) as secret.
|
|
|
|
|
|
def validate_init_data(
|
|
init_data: str,
|
|
bot_token: str,
|
|
max_age_seconds: int | None = None,
|
|
) -> str | None:
|
|
"""Validate initData and return username; see validate_init_data_with_reason for failure reason."""
|
|
_, username, _, _ = validate_init_data_with_reason(
|
|
init_data, bot_token, max_age_seconds
|
|
)
|
|
return username
|
|
|
|
|
|
def _normalize_lang(language_code: str | None) -> str:
|
|
"""Normalize to 'ru' or 'en' for i18n."""
|
|
if not language_code or not isinstance(language_code, str):
|
|
return "en"
|
|
code = language_code.strip().lower()
|
|
return "ru" if code.startswith("ru") else "en"
|
|
|
|
|
|
def validate_init_data_with_reason(
|
|
init_data: str,
|
|
bot_token: str,
|
|
max_age_seconds: int | None = None,
|
|
) -> tuple[int | None, str | None, str, str]:
|
|
"""
|
|
Validate initData signature and return (telegram_user_id, username, reason, lang).
|
|
reason is one of: "ok", "empty", "no_hash", "hash_mismatch", "auth_date_expired",
|
|
"no_user", "user_invalid", "no_user_id", "no_username" (legacy; no_user_id used when user.id missing).
|
|
lang is from user.language_code normalized to 'ru' or 'en'; 'en' when no user.
|
|
On success: (user.id, user.get('username') or None, "ok", lang) — username may be None.
|
|
"""
|
|
if not init_data or not bot_token:
|
|
return (None, None, "empty", "en")
|
|
init_data = init_data.strip()
|
|
params = {}
|
|
for part in init_data.split("&"):
|
|
if "=" not in part:
|
|
continue
|
|
key, _, value = part.partition("=")
|
|
if not key:
|
|
continue
|
|
params[key] = value
|
|
hash_val = params.pop("hash", None)
|
|
if not hash_val:
|
|
return (None, None, "no_hash", "en")
|
|
data_pairs = sorted(params.items())
|
|
# Data-check string: key=value with URL-decoded values (per Telegram example)
|
|
data_string = "\n".join(f"{k}={unquote(v)}" for k, v in data_pairs)
|
|
# HMAC-SHA256(key=WebAppData, message=bot_token) per reference implementations
|
|
secret_key = hmac.new(
|
|
b"WebAppData",
|
|
msg=bot_token.encode(),
|
|
digestmod=hashlib.sha256,
|
|
).digest()
|
|
computed = hmac.new(
|
|
secret_key,
|
|
msg=data_string.encode(),
|
|
digestmod=hashlib.sha256,
|
|
).hexdigest()
|
|
if not hmac.compare_digest(computed.lower(), hash_val.lower()):
|
|
return (None, None, "hash_mismatch", "en")
|
|
if max_age_seconds is not None and max_age_seconds > 0:
|
|
auth_date_raw = params.get("auth_date")
|
|
if not auth_date_raw:
|
|
return (None, None, "auth_date_expired", "en")
|
|
try:
|
|
auth_date = int(float(auth_date_raw))
|
|
except (ValueError, TypeError):
|
|
return (None, None, "auth_date_expired", "en")
|
|
if time.time() - auth_date > max_age_seconds:
|
|
return (None, None, "auth_date_expired", "en")
|
|
user_raw = params.get("user")
|
|
if not user_raw:
|
|
return (None, None, "no_user", "en")
|
|
try:
|
|
user = json.loads(unquote(user_raw))
|
|
except (json.JSONDecodeError, TypeError):
|
|
return (None, None, "user_invalid", "en")
|
|
if not isinstance(user, dict):
|
|
return (None, None, "user_invalid", "en")
|
|
lang = _normalize_lang(user.get("language_code"))
|
|
raw_id = user.get("id")
|
|
if raw_id is None:
|
|
return (None, None, "no_user_id", lang)
|
|
try:
|
|
telegram_user_id = int(raw_id)
|
|
except (TypeError, ValueError):
|
|
return (None, None, "no_user_id", lang)
|
|
username = user.get("username")
|
|
if username and isinstance(username, str):
|
|
username = username.strip().lstrip("@").lower()
|
|
else:
|
|
username = None
|
|
return (telegram_user_id, username, "ok", lang)
|