All checks were successful
CI / lint-and-test (push) Successful in 17s
- Created a new `CHANGELOG.md` file to document all notable changes to the project, adhering to the Keep a Changelog format. - Updated `CONTRIBUTING.md` to include instructions for building and previewing documentation using MkDocs. - Added `mkdocs.yml` configuration for documentation generation, including navigation structure and theme settings. - Enhanced various documentation files, including API reference, architecture overview, configuration reference, and runbook, to provide comprehensive guidance for users and developers. - Included new sections in the README for changelog and documentation links, improving accessibility to project information.
124 lines
4.4 KiB
Python
124 lines
4.4 KiB
Python
"""Validate Telegram Web App initData and extract user username."""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from urllib.parse import unquote
|
|
|
|
# Telegram algorithm: https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app
|
|
# Data-check string: sorted key=value with URL-decoded values, then HMAC-SHA256(WebAppData, token) as secret.
|
|
|
|
|
|
def validate_init_data(
|
|
init_data: str,
|
|
bot_token: str,
|
|
max_age_seconds: int | None = None,
|
|
) -> str | None:
|
|
"""Validate Telegram Web App initData and return username if valid.
|
|
|
|
Args:
|
|
init_data: Raw initData string from tgWebAppData.
|
|
bot_token: Bot token (must match the bot that signed the data).
|
|
max_age_seconds: Reject if auth_date older than this; None to disable.
|
|
|
|
Returns:
|
|
Username (lowercase, no @) or None if validation fails.
|
|
"""
|
|
_, username, _, _ = validate_init_data_with_reason(
|
|
init_data, bot_token, max_age_seconds
|
|
)
|
|
return username
|
|
|
|
|
|
def _normalize_lang(language_code: str | None) -> str:
|
|
"""Normalize to 'ru' or 'en' for i18n."""
|
|
if not language_code or not isinstance(language_code, str):
|
|
return "en"
|
|
code = language_code.strip().lower()
|
|
return "ru" if code.startswith("ru") else "en"
|
|
|
|
|
|
def validate_init_data_with_reason(
|
|
init_data: str,
|
|
bot_token: str,
|
|
max_age_seconds: int | None = None,
|
|
) -> tuple[int | None, str | None, str, str]:
|
|
"""Validate initData signature and return user id, username, reason, and lang.
|
|
|
|
Args:
|
|
init_data: Raw initData string from tgWebAppData.
|
|
bot_token: Bot token (must match the bot that signed the data).
|
|
max_age_seconds: Reject if auth_date older than this; None to disable.
|
|
|
|
Returns:
|
|
Tuple (telegram_user_id, username, reason, lang). reason is one of: "ok",
|
|
"empty", "no_hash", "hash_mismatch", "auth_date_expired", "no_user",
|
|
"user_invalid", "no_user_id". lang is from user.language_code normalized
|
|
to 'ru' or 'en'; 'en' when no user. On success: (user.id, username or None,
|
|
"ok", lang).
|
|
"""
|
|
if not init_data or not bot_token:
|
|
return (None, None, "empty", "en")
|
|
init_data = init_data.strip()
|
|
params = {}
|
|
for part in init_data.split("&"):
|
|
if "=" not in part:
|
|
continue
|
|
key, _, value = part.partition("=")
|
|
if not key:
|
|
continue
|
|
params[key] = value
|
|
hash_val = params.pop("hash", None)
|
|
if not hash_val:
|
|
return (None, None, "no_hash", "en")
|
|
data_pairs = sorted(params.items())
|
|
# Data-check string: key=value with URL-decoded values (per Telegram example)
|
|
data_string = "\n".join(f"{k}={unquote(v)}" for k, v in data_pairs)
|
|
# HMAC-SHA256(key=WebAppData, message=bot_token) per reference implementations
|
|
secret_key = hmac.new(
|
|
b"WebAppData",
|
|
msg=bot_token.encode(),
|
|
digestmod=hashlib.sha256,
|
|
).digest()
|
|
computed = hmac.new(
|
|
secret_key,
|
|
msg=data_string.encode(),
|
|
digestmod=hashlib.sha256,
|
|
).hexdigest()
|
|
if not hmac.compare_digest(computed.lower(), hash_val.lower()):
|
|
return (None, None, "hash_mismatch", "en")
|
|
if max_age_seconds is not None and max_age_seconds > 0:
|
|
auth_date_raw = params.get("auth_date")
|
|
if not auth_date_raw:
|
|
return (None, None, "auth_date_expired", "en")
|
|
try:
|
|
auth_date = int(float(auth_date_raw))
|
|
except (ValueError, TypeError):
|
|
return (None, None, "auth_date_expired", "en")
|
|
if time.time() - auth_date > max_age_seconds:
|
|
return (None, None, "auth_date_expired", "en")
|
|
user_raw = params.get("user")
|
|
if not user_raw:
|
|
return (None, None, "no_user", "en")
|
|
try:
|
|
user = json.loads(unquote(user_raw))
|
|
except (json.JSONDecodeError, TypeError):
|
|
return (None, None, "user_invalid", "en")
|
|
if not isinstance(user, dict):
|
|
return (None, None, "user_invalid", "en")
|
|
lang = _normalize_lang(user.get("language_code"))
|
|
raw_id = user.get("id")
|
|
if raw_id is None:
|
|
return (None, None, "no_user_id", lang)
|
|
try:
|
|
telegram_user_id = int(raw_id)
|
|
except (TypeError, ValueError):
|
|
return (None, None, "no_user_id", lang)
|
|
username = user.get("username")
|
|
if username and isinstance(username, str):
|
|
username = username.strip().lstrip("@").lower()
|
|
else:
|
|
username = None
|
|
return (telegram_user_id, username, "ok", lang)
|