From 1948618394dcfe9a34848453ce8a69e129df8e26 Mon Sep 17 00:00:00 2001 From: Nikolay Tatarinov Date: Tue, 17 Feb 2026 17:31:20 +0300 Subject: [PATCH] Refactor configuration and enhance Telegram initData validation - Improved formatting and readability in config.py and other files by adding line breaks. - Introduced INIT_DATA_MAX_AGE_SECONDS to enforce replay protection for Telegram initData. - Updated validate_init_data function to include max_age_seconds parameter for validation. - Enhanced API to reject old initData based on the new max_age_seconds setting. - Added tests for auth_date expiry and validation of initData in test_telegram_auth.py. - Updated README with details on the new INIT_DATA_MAX_AGE_SECONDS configuration. --- README.md | 5 ++- alembic/env.py | 1 + .../versions/001_initial_users_and_duties.py | 6 ++- api/app.py | 28 +++++++++--- api/telegram_auth.py | 22 ++++++++- api/test_app.py | 29 ++++++++++++ api/test_telegram_auth.py | 45 +++++++++++++++++-- config.py | 22 +++++++-- conftest.py | 1 + db/__init__.py | 1 + db/models.py | 10 ++++- db/repository.py | 1 + db/schemas.py | 4 +- db/session.py | 6 ++- handlers/__init__.py | 1 + handlers/commands.py | 19 ++++++-- handlers/errors.py | 1 + main.py | 3 ++ tests/test_config.py | 1 - 19 files changed, 181 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 77fbfb3..dd3b444 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ A minimal Telegram bot boilerplate using [python-telegram-bot](https://github.co - `DATABASE_URL` – DB connection (default: `sqlite:///data/duty_teller.db`). - `MINI_APP_BASE_URL` – Base URL of the miniapp (for documentation / CORS). - `MINI_APP_SKIP_AUTH` – Set to `1` to allow `/api/duties` without Telegram initData (dev only; insecure). + - `INIT_DATA_MAX_AGE_SECONDS` – Reject Telegram initData older than this (e.g. `86400` = 24h). `0` = disabled (default). - `CORS_ORIGINS` – Comma-separated allowed origins for CORS, or leave unset for `*`. ## Run @@ -66,6 +67,8 @@ Ensure `.env` exists (e.g. `cp .env.example .env`) and contains `BOT_TOKEN`. ``` For production deployments you may use Docker secrets or your orchestrator’s env instead of a `.env` file. +**Production behind a reverse proxy:** When the app is behind nginx/Caddy etc., `request.client.host` is usually the proxy (e.g. 127.0.0.1). The "private IP" bypass (allowing requests without initData from localhost) then applies to the proxy, not the real client. Either ensure the Mini App always sends initData, or forward the real client IP (e.g. `X-Forwarded-For`) and use it for that check. See `api/app.py` `_is_private_client` for details. + ## Project layout - `main.py` – Builds the `Application`, registers handlers, runs polling and FastAPI in a thread. @@ -88,4 +91,4 @@ pip install -r requirements-dev.txt pytest ``` -Tests cover `api/telegram_auth` (validate_init_data), `config` (is_admin, can_access_miniapp), and the API (date validation, 403/200 with mocked auth). +Tests cover `api/telegram_auth` (validate_init_data, auth_date expiry), `config` (is_admin, can_access_miniapp), and the API (date validation, 403/200 with mocked auth, plus an E2E auth test without auth mocks). diff --git a/alembic/env.py b/alembic/env.py index 6a2c797..0fe9bf6 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -1,4 +1,5 @@ """Alembic env: use config DATABASE_URL and db.models.Base.""" + import os import sys from logging.config import fileConfig diff --git a/alembic/versions/001_initial_users_and_duties.py b/alembic/versions/001_initial_users_and_duties.py index 39424f6..70613d8 100644 --- a/alembic/versions/001_initial_users_and_duties.py +++ b/alembic/versions/001_initial_users_and_duties.py @@ -5,6 +5,7 @@ Revises: Create Date: 2025-02-17 """ + from typing import Sequence, Union from alembic import op @@ -34,7 +35,10 @@ def upgrade() -> None: sa.Column("user_id", sa.Integer(), nullable=False), sa.Column("start_at", sa.Text(), nullable=False), sa.Column("end_at", sa.Text(), nullable=False), - sa.ForeignKeyConstraint(["user_id"], ["users.id"], ), + sa.ForeignKeyConstraint( + ["user_id"], + ["users.id"], + ), sa.PrimaryKeyConstraint("id"), ) diff --git a/api/app.py b/api/app.py index c309250..fc81982 100644 --- a/api/app.py +++ b/api/app.py @@ -1,4 +1,5 @@ """FastAPI app: /api/duties and static webapp.""" + import logging import re from pathlib import Path @@ -50,7 +51,15 @@ def _fetch_duties_response(from_date: str, to_date: str) -> list[DutyWithUser]: def _is_private_client(client_host: str | None) -> bool: - """True if client is localhost or private LAN (dev / same-machine access).""" + """True if client is localhost or private LAN (dev / same-machine access). + + Note: Behind a reverse proxy (e.g. nginx, Caddy), request.client.host is often + the proxy address (e.g. 127.0.0.1). Then "private client" would be true for all + requests when initData is missing. For production, either rely on the Mini App + always sending initData, or configure the proxy to forward the real client IP + (e.g. X-Forwarded-For) and use that for this check. Do not rely on the private-IP + bypass when deployed behind a proxy without one of these measures. + """ if not client_host: return False if client_host in ("127.0.0.1", "::1"): @@ -84,19 +93,28 @@ def list_duties( x_telegram_init_data: str | None = Header(None, alias="X-Telegram-Init-Data"), ) -> list[DutyWithUser]: _validate_duty_dates(from_date, to_date) - log.info("GET /api/duties from %s, has initData: %s", request.client.host if request.client else "?", bool((x_telegram_init_data or "").strip())) + log.info( + "GET /api/duties from %s, has initData: %s", + request.client.host if request.client else "?", + bool((x_telegram_init_data or "").strip()), + ) init_data = (x_telegram_init_data or "").strip() if not init_data: client_host = request.client.host if request.client else None if _is_private_client(client_host) or config.MINI_APP_SKIP_AUTH: if config.MINI_APP_SKIP_AUTH: - log.warning("duties: allowing without initData (MINI_APP_SKIP_AUTH is set)") + log.warning( + "duties: allowing without initData (MINI_APP_SKIP_AUTH is set)" + ) return _fetch_duties_response(from_date, to_date) log.warning("duties: no X-Telegram-Init-Data header (client=%s)", client_host) raise HTTPException(status_code=403, detail="Откройте календарь из Telegram") - username = validate_init_data(init_data, config.BOT_TOKEN) + max_age = config.INIT_DATA_MAX_AGE_SECONDS or None + username = validate_init_data(init_data, config.BOT_TOKEN, max_age_seconds=max_age) if username is None: - log.warning("duties: initData validation failed (invalid signature or no username)") + log.warning( + "duties: initData validation failed (invalid signature or no username)" + ) raise HTTPException(status_code=403, detail="Неверные данные авторизации") if not config.can_access_miniapp(username): log.warning("duties: username not in allowlist") diff --git a/api/telegram_auth.py b/api/telegram_auth.py index e3a057f..ace88bb 100644 --- a/api/telegram_auth.py +++ b/api/telegram_auth.py @@ -1,17 +1,26 @@ """Validate Telegram Web App initData and extract user username.""" + import hashlib import hmac import json +import time from urllib.parse import unquote # Telegram algorithm: https://core.telegram.org/bots/webapps#validating-data-received-via-the-mini-app # Data-check string must use the same key=value pairs as received (sorted by key); we preserve raw values. -def validate_init_data(init_data: str, bot_token: str) -> str | None: +def validate_init_data( + init_data: str, + bot_token: str, + max_age_seconds: int | None = None, +) -> str | None: """ Validate initData signature and return the Telegram username (lowercase, no @). Returns None if data is invalid, forged, or user has no username. + + If max_age_seconds is set, initData must include auth_date and it must be no older + than max_age_seconds (replay protection). Example: 86400 = 24 hours. """ if not init_data or not bot_token: return None @@ -45,6 +54,17 @@ def validate_init_data(init_data: str, bot_token: str) -> str | None: ).hexdigest() if not hmac.compare_digest(computed.lower(), hash_val.lower()): return None + # Optional replay protection: reject initData older than max_age_seconds + if max_age_seconds is not None and max_age_seconds > 0: + auth_date_raw = params.get("auth_date") + if not auth_date_raw: + return None + try: + auth_date = int(float(auth_date_raw)) + except (ValueError, TypeError): + return None + if time.time() - auth_date > max_age_seconds: + return None # Parse user JSON (value may be URL-encoded in the raw string) user_raw = params.get("user") if not user_raw: diff --git a/api/test_app.py b/api/test_app.py index e00a0e4..734d2a4 100644 --- a/api/test_app.py +++ b/api/test_app.py @@ -1,10 +1,14 @@ """Tests for FastAPI app /api/duties.""" + +import time from unittest.mock import patch import pytest from fastapi.testclient import TestClient +import config from api.app import app +from api.test_telegram_auth import _make_init_data @pytest.fixture @@ -101,3 +105,28 @@ def test_duties_200_with_allowed_user(mock_can_access, mock_validate, client): assert len(r.json()) == 1 assert r.json()[0]["full_name"] == "Иван Иванов" mock_fetch.assert_called_once_with("2025-01-01", "2025-01-31") + + +def test_duties_e2e_auth_real_validation(client, monkeypatch): + """E2E: valid initData + allowlist, no mocks on validate_init_data; full auth path.""" + test_token = "123:ABC" + test_username = "e2euser" + monkeypatch.setattr(config, "BOT_TOKEN", test_token) + monkeypatch.setattr(config, "ALLOWED_USERNAMES", {test_username}) + monkeypatch.setattr(config, "ADMIN_USERNAMES", set()) + monkeypatch.setattr(config, "INIT_DATA_MAX_AGE_SECONDS", 0) + init_data = _make_init_data( + {"id": 1, "username": test_username}, + test_token, + auth_date=int(time.time()), + ) + with patch("api.app._fetch_duties_response") as mock_fetch: + mock_fetch.return_value = [] + r = client.get( + "/api/duties", + params={"from": "2025-01-01", "to": "2025-01-31"}, + headers={"X-Telegram-Init-Data": init_data}, + ) + assert r.status_code == 200 + assert r.json() == [] + mock_fetch.assert_called_once_with("2025-01-01", "2025-01-31") diff --git a/api/test_telegram_auth.py b/api/test_telegram_auth.py index ab45442..1bd97c5 100644 --- a/api/test_telegram_auth.py +++ b/api/test_telegram_auth.py @@ -1,19 +1,25 @@ """Tests for api.telegram_auth.validate_init_data.""" + import hashlib import hmac import json -from urllib.parse import quote, urlencode +from urllib.parse import quote -import pytest from api.telegram_auth import validate_init_data -def _make_init_data(user: dict | None, bot_token: str) -> str: +def _make_init_data( + user: dict | None, + bot_token: str, + auth_date: int | None = None, +) -> str: """Build initData string with valid HMAC for testing.""" params = {} if user is not None: params["user"] = quote(json.dumps(user)) + if auth_date is not None: + params["auth_date"] = str(auth_date) pairs = sorted(params.items()) data_string = "\n".join(f"{k}={v}" for k, v in pairs) secret_key = hmac.new( @@ -82,3 +88,36 @@ def test_empty_bot_token_returns_none(): user = {"id": 1, "username": "u"} init_data = _make_init_data(user, "token") assert validate_init_data(init_data, "") is None + + +def test_auth_date_expiry_rejects_old_init_data(): + """When max_age_seconds is set, initData older than that is rejected.""" + import time as t + + bot_token = "123:ABC" + user = {"id": 1, "username": "testuser"} + # auth_date 100 seconds ago + old_ts = int(t.time()) - 100 + init_data = _make_init_data(user, bot_token, auth_date=old_ts) + assert validate_init_data(init_data, bot_token, max_age_seconds=60) is None + assert validate_init_data(init_data, bot_token, max_age_seconds=200) == "testuser" + + +def test_auth_date_expiry_accepts_fresh_init_data(): + """Fresh auth_date within max_age_seconds is accepted.""" + import time as t + + bot_token = "123:ABC" + user = {"id": 1, "username": "testuser"} + fresh_ts = int(t.time()) - 10 + init_data = _make_init_data(user, bot_token, auth_date=fresh_ts) + assert validate_init_data(init_data, bot_token, max_age_seconds=60) == "testuser" + + +def test_auth_date_expiry_requires_auth_date_when_max_age_set(): + """When max_age_seconds is set but auth_date is missing, return None.""" + bot_token = "123:ABC" + user = {"id": 1, "username": "testuser"} + init_data = _make_init_data(user, bot_token) # no auth_date + assert validate_init_data(init_data, bot_token, max_age_seconds=86400) is None + assert validate_init_data(init_data, bot_token) == "testuser" diff --git a/config.py b/config.py index d7741e2..ad5d620 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,5 @@ """Load configuration from environment. Fail fast if BOT_TOKEN is missing.""" + import os from pathlib import Path @@ -8,7 +9,9 @@ load_dotenv() BOT_TOKEN = os.getenv("BOT_TOKEN") if not BOT_TOKEN: - raise SystemExit("BOT_TOKEN is not set. Copy .env.example to .env and set your token from @BotFather.") + raise SystemExit( + "BOT_TOKEN is not set. Copy .env.example to .env and set your token from @BotFather." + ) DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///data/duty_teller.db") MINI_APP_BASE_URL = os.getenv("MINI_APP_BASE_URL", "").rstrip("/") @@ -17,17 +20,28 @@ DATA_DIR = Path(__file__).resolve().parent / "data" # Miniapp access: comma-separated Telegram usernames (no @). Empty = no one allowed. _raw_allowed = os.getenv("ALLOWED_USERNAMES", "").strip() -ALLOWED_USERNAMES = {s.strip().lstrip("@").lower() for s in _raw_allowed.split(",") if s.strip()} +ALLOWED_USERNAMES = { + s.strip().lstrip("@").lower() for s in _raw_allowed.split(",") if s.strip() +} _raw_admin = os.getenv("ADMIN_USERNAMES", "").strip() -ADMIN_USERNAMES = {s.strip().lstrip("@").lower() for s in _raw_admin.split(",") if s.strip()} +ADMIN_USERNAMES = { + s.strip().lstrip("@").lower() for s in _raw_admin.split(",") if s.strip() +} # Dev only: set to 1 to allow /api/duties without Telegram initData (insecure, no user check). MINI_APP_SKIP_AUTH = os.getenv("MINI_APP_SKIP_AUTH", "").strip() in ("1", "true", "yes") +# Optional replay protection: reject initData older than this many seconds. 0 = disabled (default). +INIT_DATA_MAX_AGE_SECONDS = int(os.getenv("INIT_DATA_MAX_AGE_SECONDS", "0")) + # CORS: comma-separated origins, or empty/"*" for allow all. For production, set to MINI_APP_BASE_URL or specific origins. _raw_cors = os.getenv("CORS_ORIGINS", "").strip() -CORS_ORIGINS = [_o.strip() for _o in _raw_cors.split(",") if _o.strip()] if _raw_cors and _raw_cors != "*" else ["*"] +CORS_ORIGINS = ( + [_o.strip() for _o in _raw_cors.split(",") if _o.strip()] + if _raw_cors and _raw_cors != "*" + else ["*"] +) def is_admin(username: str) -> bool: diff --git a/conftest.py b/conftest.py index 5141667..363c9d1 100644 --- a/conftest.py +++ b/conftest.py @@ -1,4 +1,5 @@ """Pytest configuration. Set BOT_TOKEN so config module can be imported.""" + import os # Set before any project code imports config (which requires BOT_TOKEN). diff --git a/db/__init__.py b/db/__init__.py index 6798cee..618ec9e 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -1,4 +1,5 @@ """Database layer: SQLAlchemy models, Pydantic schemas, repository, init.""" + from db.models import Base, User, Duty from db.schemas import UserCreate, UserInDb, DutyCreate, DutyInDb, DutyWithUser from db.session import get_engine, get_session_factory, get_session diff --git a/db/models.py b/db/models.py index d6c5374..7f879db 100644 --- a/db/models.py +++ b/db/models.py @@ -1,10 +1,12 @@ """SQLAlchemy ORM models for users and duties.""" + from sqlalchemy import ForeignKey, Integer, BigInteger, Text from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship class Base(DeclarativeBase): """Declarative base for all models.""" + pass @@ -12,7 +14,9 @@ class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) - telegram_user_id: Mapped[int] = mapped_column(BigInteger, unique=True, nullable=False) + telegram_user_id: Mapped[int] = mapped_column( + BigInteger, unique=True, nullable=False + ) full_name: Mapped[str] = mapped_column(Text, nullable=False) username: Mapped[str | None] = mapped_column(Text, nullable=True) first_name: Mapped[str | None] = mapped_column(Text, nullable=True) @@ -25,7 +29,9 @@ class Duty(Base): __tablename__ = "duties" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False) + user_id: Mapped[int] = mapped_column( + Integer, ForeignKey("users.id"), nullable=False + ) start_at: Mapped[str] = mapped_column(Text, nullable=False) # ISO 8601 end_at: Mapped[str] = mapped_column(Text, nullable=False) # ISO 8601 diff --git a/db/repository.py b/db/repository.py index c603526..a0539d7 100644 --- a/db/repository.py +++ b/db/repository.py @@ -1,4 +1,5 @@ """Repository: get_or_create_user, get_duties, insert_duty.""" + from sqlalchemy.orm import Session from db.models import User, Duty diff --git a/db/schemas.py b/db/schemas.py index 3d38be7..9e048eb 100644 --- a/db/schemas.py +++ b/db/schemas.py @@ -1,4 +1,5 @@ """Pydantic schemas for API and validation.""" + from pydantic import BaseModel, ConfigDict @@ -23,7 +24,7 @@ class UserInDb(UserBase): class DutyBase(BaseModel): user_id: int start_at: str # ISO 8601 - end_at: str # ISO 8601 + end_at: str # ISO 8601 class DutyCreate(DutyBase): @@ -38,6 +39,7 @@ class DutyInDb(DutyBase): class DutyWithUser(DutyInDb): """Duty with full_name for calendar display.""" + full_name: str model_config = ConfigDict(from_attributes=True) diff --git a/db/session.py b/db/session.py index cf7d9c6..463c9e7 100644 --- a/db/session.py +++ b/db/session.py @@ -1,11 +1,11 @@ """SQLAlchemy engine and session factory.""" + from contextlib import contextmanager from typing import Generator from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker -from db.models import Base _engine = None _SessionLocal = None @@ -26,7 +26,9 @@ def get_engine(database_url: str): if _engine is None: _engine = create_engine( database_url, - connect_args={"check_same_thread": False} if "sqlite" in database_url else {}, + connect_args={"check_same_thread": False} + if "sqlite" in database_url + else {}, echo=False, ) return _engine diff --git a/handlers/__init__.py b/handlers/__init__.py index 4c2494a..13cf0f0 100644 --- a/handlers/__init__.py +++ b/handlers/__init__.py @@ -1,4 +1,5 @@ """Expose a single register_handlers(app) that registers all handlers.""" + from telegram.ext import Application from . import commands, errors diff --git a/handlers/commands.py b/handlers/commands.py index 80e6d37..183628f 100644 --- a/handlers/commands.py +++ b/handlers/commands.py @@ -1,4 +1,5 @@ """Command handlers: /start, /help; /start registers user and shows Calendar button.""" + import asyncio import config @@ -15,7 +16,10 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user if not user: return - full_name = " ".join(filter(None, [user.first_name or "", user.last_name or ""])).strip() or "User" + full_name = ( + " ".join(filter(None, [user.first_name or "", user.last_name or ""])).strip() + or "User" + ) telegram_user_id = user.id username = user.username first_name = user.first_name @@ -39,9 +43,16 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: text = "Привет! Я бот календаря дежурств. Используй /help для списка команд." if config.MINI_APP_BASE_URL: - keyboard = InlineKeyboardMarkup([ - [InlineKeyboardButton("📅 Календарь", web_app=WebAppInfo(url=config.MINI_APP_BASE_URL + "/app/"))], - ]) + keyboard = InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + "📅 Календарь", + web_app=WebAppInfo(url=config.MINI_APP_BASE_URL + "/app/"), + ) + ], + ] + ) await update.message.reply_text(text, reply_markup=keyboard) else: await update.message.reply_text(text) diff --git a/handlers/errors.py b/handlers/errors.py index 2a855a4..0526dbb 100644 --- a/handlers/errors.py +++ b/handlers/errors.py @@ -1,4 +1,5 @@ """Global error handler: log exception and notify user.""" + import logging from telegram import Update diff --git a/main.py b/main.py index 9b48401..5f2d1c2 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ """Single entry point: build Application, run HTTP server + polling. Migrations run in Docker entrypoint.""" + import asyncio import logging import threading @@ -18,6 +19,7 @@ logger = logging.getLogger(__name__) def _run_uvicorn(web_app, port: int) -> None: """Run uvicorn in a dedicated thread with its own event loop.""" import uvicorn + loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) server = uvicorn.Server( @@ -31,6 +33,7 @@ def main() -> None: register_handlers(app) from api.app import app as web_app + t = threading.Thread( target=_run_uvicorn, args=(web_app, config.HTTP_PORT), diff --git a/tests/test_config.py b/tests/test_config.py index c2702f6..fd3d37a 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,5 +1,4 @@ """Tests for config.is_admin and config.can_access_miniapp.""" -import pytest import config