All checks were successful
CI / lint-and-test (push) Successful in 21s
- Introduced a new `normalize_lang` function to standardize language codes across the application, ensuring consistent handling of user language preferences. - Refactored date handling utilities by adding `parse_utc_iso` and `parse_utc_iso_naive` functions for better parsing of ISO 8601 date strings, enhancing timezone awareness. - Updated various modules to utilize the new language normalization and date parsing functions, improving code clarity and maintainability. - Enhanced error handling in date validation to raise specific `DateRangeValidationError` exceptions, providing clearer feedback on validation issues. - Improved test coverage for date range validation and language normalization functionalities, ensuring robustness and reliability.
118 lines
4.2 KiB
Python
118 lines
4.2 KiB
Python
"""Validate Telegram Web App initData and extract user username."""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from urllib.parse import unquote
|
|
|
|
from duty_teller.i18n.lang import normalize_lang
|
|
|
|
# Telegram algorithm: https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app
|
|
# Data-check string: sorted key=value with URL-decoded values, then HMAC-SHA256(WebAppData, token) as secret.
|
|
|
|
|
|
def validate_init_data(
|
|
init_data: str,
|
|
bot_token: str,
|
|
max_age_seconds: int | None = None,
|
|
) -> str | None:
|
|
"""Validate Telegram Web App initData and return username if valid.
|
|
|
|
Args:
|
|
init_data: Raw initData string from tgWebAppData.
|
|
bot_token: Bot token (must match the bot that signed the data).
|
|
max_age_seconds: Reject if auth_date older than this; None to disable.
|
|
|
|
Returns:
|
|
Username (lowercase, no @) or None if validation fails.
|
|
"""
|
|
_, username, _, _ = validate_init_data_with_reason(
|
|
init_data, bot_token, max_age_seconds
|
|
)
|
|
return username
|
|
|
|
|
|
def validate_init_data_with_reason(
|
|
init_data: str,
|
|
bot_token: str,
|
|
max_age_seconds: int | None = None,
|
|
) -> tuple[int | None, str | None, str, str]:
|
|
"""Validate initData signature and return user id, username, reason, and lang.
|
|
|
|
Args:
|
|
init_data: Raw initData string from tgWebAppData.
|
|
bot_token: Bot token (must match the bot that signed the data).
|
|
max_age_seconds: Reject if auth_date older than this; None to disable.
|
|
|
|
Returns:
|
|
Tuple (telegram_user_id, username, reason, lang). reason is one of: "ok",
|
|
"empty", "no_hash", "hash_mismatch", "auth_date_expired", "no_user",
|
|
"user_invalid", "no_user_id". lang is from user.language_code normalized
|
|
to 'ru' or 'en'; 'en' when no user. On success: (user.id, username or None,
|
|
"ok", lang).
|
|
"""
|
|
if not init_data or not bot_token:
|
|
return (None, None, "empty", "en")
|
|
init_data = init_data.strip()
|
|
params = {}
|
|
for part in init_data.split("&"):
|
|
if "=" not in part:
|
|
continue
|
|
key, _, value = part.partition("=")
|
|
if not key:
|
|
continue
|
|
params[key] = value
|
|
hash_val = params.pop("hash", None)
|
|
if not hash_val:
|
|
return (None, None, "no_hash", "en")
|
|
data_pairs = sorted(params.items())
|
|
# Data-check string: key=value with URL-decoded values (per Telegram example)
|
|
data_string = "\n".join(f"{k}={unquote(v)}" for k, v in data_pairs)
|
|
# HMAC-SHA256(key=WebAppData, message=bot_token) per reference implementations
|
|
secret_key = hmac.new(
|
|
b"WebAppData",
|
|
msg=bot_token.encode(),
|
|
digestmod=hashlib.sha256,
|
|
).digest()
|
|
computed = hmac.new(
|
|
secret_key,
|
|
msg=data_string.encode(),
|
|
digestmod=hashlib.sha256,
|
|
).hexdigest()
|
|
if not hmac.compare_digest(computed.lower(), hash_val.lower()):
|
|
return (None, None, "hash_mismatch", "en")
|
|
if max_age_seconds is not None and max_age_seconds > 0:
|
|
auth_date_raw = params.get("auth_date")
|
|
if not auth_date_raw:
|
|
return (None, None, "auth_date_expired", "en")
|
|
try:
|
|
auth_date = int(float(auth_date_raw))
|
|
except (ValueError, TypeError):
|
|
return (None, None, "auth_date_expired", "en")
|
|
if time.time() - auth_date > max_age_seconds:
|
|
return (None, None, "auth_date_expired", "en")
|
|
user_raw = params.get("user")
|
|
if not user_raw:
|
|
return (None, None, "no_user", "en")
|
|
try:
|
|
user = json.loads(unquote(user_raw))
|
|
except (json.JSONDecodeError, TypeError):
|
|
return (None, None, "user_invalid", "en")
|
|
if not isinstance(user, dict):
|
|
return (None, None, "user_invalid", "en")
|
|
lang = normalize_lang(user.get("language_code"))
|
|
raw_id = user.get("id")
|
|
if raw_id is None:
|
|
return (None, None, "no_user_id", lang)
|
|
try:
|
|
telegram_user_id = int(raw_id)
|
|
except (TypeError, ValueError):
|
|
return (None, None, "no_user_id", lang)
|
|
username = user.get("username")
|
|
if username and isinstance(username, str):
|
|
username = username.strip().lstrip("@").lower()
|
|
else:
|
|
username = None
|
|
return (telegram_user_id, username, "ok", lang)
|