"""Validate Telegram Web App initData and extract user username.""" import hashlib import hmac import json import time from urllib.parse import unquote import duty_teller.config as config # Telegram algorithm: https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app # Data-check string: sorted key=value with URL-decoded values, then HMAC-SHA256(WebAppData, token) as secret. def validate_init_data( init_data: str, bot_token: str, max_age_seconds: int | None = None, ) -> str | None: """Validate Telegram Web App initData and return username if valid. Args: init_data: Raw initData string from tgWebAppData. bot_token: Bot token (must match the bot that signed the data). max_age_seconds: Reject if auth_date older than this; None to disable. Returns: Username (lowercase, no @) or None if validation fails. """ _, username, _, _ = validate_init_data_with_reason( init_data, bot_token, max_age_seconds ) return username def validate_init_data_with_reason( init_data: str, bot_token: str, max_age_seconds: int | None = None, ) -> tuple[int | None, str | None, str, str]: """Validate initData signature and return user id, username, reason, and lang. Args: init_data: Raw initData string from tgWebAppData. bot_token: Bot token (must match the bot that signed the data). max_age_seconds: Reject if auth_date older than this; None to disable. Returns: Tuple (telegram_user_id, username, reason, lang). reason is one of: "ok", "empty", "no_hash", "hash_mismatch", "auth_date_expired", "no_user", "user_invalid", "no_user_id". lang is always config.DEFAULT_LANGUAGE. On success: (user.id, username or None, "ok", lang). """ lang = config.DEFAULT_LANGUAGE if not init_data or not bot_token: return (None, None, "empty", lang) init_data = init_data.strip() params = {} for part in init_data.split("&"): if "=" not in part: continue key, _, value = part.partition("=") if not key: continue params[key] = value hash_val = params.pop("hash", None) if not hash_val: return (None, None, "no_hash", lang) data_pairs = sorted(params.items()) # Data-check string: key=value with URL-decoded values (per Telegram example) data_string = "\n".join(f"{k}={unquote(v)}" for k, v in data_pairs) # HMAC-SHA256(key=WebAppData, message=bot_token) per reference implementations secret_key = hmac.new( b"WebAppData", msg=bot_token.encode(), digestmod=hashlib.sha256, ).digest() computed = hmac.new( secret_key, msg=data_string.encode(), digestmod=hashlib.sha256, ).hexdigest() if not hmac.compare_digest(computed.lower(), hash_val.lower()): return (None, None, "hash_mismatch", lang) if max_age_seconds is not None and max_age_seconds > 0: auth_date_raw = params.get("auth_date") if not auth_date_raw: return (None, None, "auth_date_expired", lang) try: auth_date = int(float(auth_date_raw)) except (ValueError, TypeError): return (None, None, "auth_date_expired", lang) if time.time() - auth_date > max_age_seconds: return (None, None, "auth_date_expired", lang) user_raw = params.get("user") if not user_raw: return (None, None, "no_user", lang) try: user = json.loads(unquote(user_raw)) except (json.JSONDecodeError, TypeError): return (None, None, "user_invalid", lang) if not isinstance(user, dict): return (None, None, "user_invalid", lang) raw_id = user.get("id") if raw_id is None: return (None, None, "no_user_id", lang) try: telegram_user_id = int(raw_id) except (TypeError, ValueError): return (None, None, "no_user_id", lang) username = user.get("username") if username and isinstance(username, str): username = username.strip().lstrip("@").lower() else: username = None return (telegram_user_id, username, "ok", lang)