Implement phone number normalization and access control for Telegram users

- Added functionality to normalize phone numbers for comparison, ensuring only digits are stored and checked.
- Updated configuration to include optional phone number allowlists for users and admins in the environment settings.
- Enhanced authentication logic to allow access based on normalized phone numbers, in addition to usernames.
- Introduced new helper functions for parsing and validating phone numbers, improving code organization and maintainability.
- Added unit tests to validate phone normalization and access control based on phone numbers.
This commit is contained in:
2026-02-18 16:11:44 +03:00
parent d0d22c150a
commit 59ba2a9ca4
10 changed files with 344 additions and 48 deletions

View File

@@ -1,5 +1,6 @@
"""Load configuration from environment. BOT_TOKEN is not validated on import; check in main/entry point."""
import re
import os
from dataclasses import dataclass
from pathlib import Path
@@ -11,6 +12,16 @@ load_dotenv()
# Project root (parent of duty_teller package). Used for webapp path, etc.
PROJECT_ROOT = Path(__file__).resolve().parent.parent
# Only digits for phone comparison (same when saving and when checking allowlist).
_PHONE_DIGITS_RE = re.compile(r"\D")
def normalize_phone(phone: str | None) -> str:
"""Return phone as digits only (spaces, +, parentheses, dashes removed). Empty string if None/empty."""
if not phone or not isinstance(phone, str):
return ""
return _PHONE_DIGITS_RE.sub("", phone.strip())
def _normalize_default_language(value: str) -> str:
"""Normalize DEFAULT_LANGUAGE from env to 'ru' or 'en'."""
@@ -20,6 +31,16 @@ def _normalize_default_language(value: str) -> str:
return "ru" if v.startswith("ru") else "en"
def _parse_phone_list(raw: str) -> set[str]:
"""Parse comma-separated phones into set of normalized (digits-only) strings."""
result = set()
for s in (raw or "").strip().split(","):
normalized = normalize_phone(s)
if normalized:
result.add(normalized)
return result
@dataclass(frozen=True)
class Settings:
"""Optional injectable settings built from env. Tests can override or build from env."""
@@ -30,6 +51,8 @@ class Settings:
http_port: int
allowed_usernames: set[str]
admin_usernames: set[str]
allowed_phones: set[str]
admin_phones: set[str]
mini_app_skip_auth: bool
init_data_max_age_seconds: int
cors_origins: list[str]
@@ -49,6 +72,8 @@ class Settings:
admin = {
s.strip().lstrip("@").lower() for s in raw_admin.split(",") if s.strip()
}
allowed_phones = _parse_phone_list(os.getenv("ALLOWED_PHONES", ""))
admin_phones = _parse_phone_list(os.getenv("ADMIN_PHONES", ""))
raw_cors = os.getenv("CORS_ORIGINS", "").strip()
cors = (
[_o.strip() for _o in raw_cors.split(",") if _o.strip()]
@@ -62,6 +87,8 @@ class Settings:
http_port=int(os.getenv("HTTP_PORT", "8080")),
allowed_usernames=allowed,
admin_usernames=admin,
allowed_phones=allowed_phones,
admin_phones=admin_phones,
mini_app_skip_auth=os.getenv("MINI_APP_SKIP_AUTH", "").strip()
in ("1", "true", "yes"),
init_data_max_age_seconds=int(os.getenv("INIT_DATA_MAX_AGE_SECONDS", "0")),
@@ -93,6 +120,9 @@ ADMIN_USERNAMES = {
s.strip().lstrip("@").lower() for s in _raw_admin.split(",") if s.strip()
}
ALLOWED_PHONES = _parse_phone_list(os.getenv("ALLOWED_PHONES", ""))
ADMIN_PHONES = _parse_phone_list(os.getenv("ADMIN_PHONES", ""))
MINI_APP_SKIP_AUTH = os.getenv("MINI_APP_SKIP_AUTH", "").strip() in ("1", "true", "yes")
INIT_DATA_MAX_AGE_SECONDS = int(os.getenv("INIT_DATA_MAX_AGE_SECONDS", "0"))
@@ -123,6 +153,20 @@ def can_access_miniapp(username: str) -> bool:
return u in ALLOWED_USERNAMES or u in ADMIN_USERNAMES
def can_access_miniapp_by_phone(phone: str | None) -> bool:
"""True if normalized phone is in ALLOWED_PHONES or ADMIN_PHONES."""
normalized = normalize_phone(phone)
if not normalized:
return False
return normalized in ALLOWED_PHONES or normalized in ADMIN_PHONES
def is_admin_by_phone(phone: str | None) -> bool:
"""True if normalized phone is in ADMIN_PHONES."""
normalized = normalize_phone(phone)
return bool(normalized and normalized in ADMIN_PHONES)
def require_bot_token() -> None:
"""Raise SystemExit with a clear message if BOT_TOKEN is not set. Call from entry point."""
if not BOT_TOKEN: