"""Validate Telegram Web App initData and extract user username.""" import hashlib import hmac import json import time from urllib.parse import unquote # Telegram algorithm: https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app # Data-check string: sorted key=value with URL-decoded values, then HMAC-SHA256(WebAppData, token) as secret. def validate_init_data( init_data: str, bot_token: str, max_age_seconds: int | None = None, ) -> str | None: """Validate initData and return username; see validate_init_data_with_reason for failure reason.""" username, _ = validate_init_data_with_reason(init_data, bot_token, max_age_seconds) return username def validate_init_data_with_reason( init_data: str, bot_token: str, max_age_seconds: int | None = None, ) -> tuple[str | None, str]: """ Validate initData signature and return (username, None) or (None, reason). reason is one of: "ok", "empty", "no_hash", "hash_mismatch", "auth_date_expired", "no_user", "user_invalid", "no_username". """ if not init_data or not bot_token: return (None, "empty") init_data = init_data.strip() params = {} for part in init_data.split("&"): if "=" not in part: continue key, _, value = part.partition("=") if not key: continue params[key] = value hash_val = params.pop("hash", None) if not hash_val: return (None, "no_hash") data_pairs = sorted(params.items()) # Data-check string: key=value with URL-decoded values (per Telegram example) data_string = "\n".join(f"{k}={unquote(v)}" for k, v in data_pairs) # HMAC-SHA256(key=WebAppData, message=bot_token) per reference implementations secret_key = hmac.new( b"WebAppData", msg=bot_token.encode(), digestmod=hashlib.sha256, ).digest() computed = hmac.new( secret_key, msg=data_string.encode(), digestmod=hashlib.sha256, ).hexdigest() if not hmac.compare_digest(computed.lower(), hash_val.lower()): return (None, "hash_mismatch") if max_age_seconds is not None and max_age_seconds > 0: auth_date_raw = params.get("auth_date") if not auth_date_raw: return (None, "auth_date_expired") try: auth_date = int(float(auth_date_raw)) except (ValueError, TypeError): return (None, "auth_date_expired") if time.time() - auth_date > max_age_seconds: return (None, "auth_date_expired") user_raw = params.get("user") if not user_raw: return (None, "no_user") try: user = json.loads(unquote(user_raw)) except (json.JSONDecodeError, TypeError): return (None, "user_invalid") if not isinstance(user, dict): return (None, "user_invalid") username = user.get("username") if not username or not isinstance(username, str): return (None, "no_username") return (username.strip().lstrip("@").lower(), "ok")