From ae21883e1e5695cff6cefd3b03d5a61e46d3265f Mon Sep 17 00:00:00 2001 From: Nikolay Tatarinov Date: Fri, 20 Feb 2026 16:42:41 +0300 Subject: [PATCH] refactor: streamline configuration loading and enhance admin checks - Refactored the configuration loading in `config.py` to utilize a single source of truth through the `Settings` class, improving maintainability and clarity. - Introduced the `is_admin_for_telegram_user` function in `repository.py` to centralize admin checks based on usernames and phone numbers. - Updated command handlers to use the new admin check function, ensuring consistent access control across the application. - Enhanced error handling in the `error_handler` to log exceptions when sending error replies to users, improving debugging capabilities. - Improved the handling of user phone updates in `repository.py` to ensure proper normalization and validation of phone numbers. --- .coverage | Bin 53248 -> 53248 bytes duty_teller/config.py | 51 ++++++------------ duty_teller/db/repository.py | 26 +++++++-- duty_teller/handlers/commands.py | 9 +++- duty_teller/handlers/errors.py | 9 ++-- duty_teller/handlers/group_duty_pin.py | 2 +- duty_teller/handlers/import_duty_schedule.py | 27 ++++++++-- duty_teller/run.py | 1 + .../services/group_duty_pin_service.py | 4 +- 9 files changed, 79 insertions(+), 50 deletions(-) diff --git a/.coverage b/.coverage index fa9f5e67c81040a21acbd56d9c9aeedea4d5a54c..3824ee18f643ece7bd620ef5db1b36455d74a5a7 100644 GIT binary patch delta 154 zcmV;L0A>GxpaX!Q1F!~wG&KMZ`490A+7HGLqYso1eGgp^P7gi~`wr_4+YZwX#}2X% zst%J5iVkuPU=CLfN)A2_HM0>A`wf#ej>Zi^3k3lQfdUM=CV&J0lLn6<7AF7z-xB}; z00aO4000O80RR922ml0=JdYnB9tZ>h2|faXSCRmyrk4W%4Box{|2OFWCqM#BfG5BN Ivwe?)K**Ua+5i9m delta 153 zcmZozz}&Eac>`O6i5Ua`PyUbmxA>3oFXo@ZU&|lO@567y_lNHd-yOayd?)xe^R4Ea z!q?4L%$LX)!RO6q&u6w-P~Z>mWYfN5Tz1?njGT2mRjLh>Ir6$G%wu3+Pyk{E z2L=uX1`Y=Xg~@jP@=6LEK$T8RP2rOmmM@#m%)kRu!cg%%XSOt>Py&+;quS=${$>XN Dw4*74 diff --git a/duty_teller/config.py b/duty_teller/config.py index 96308d7..67b551c 100644 --- a/duty_teller/config.py +++ b/duty_teller/config.py @@ -119,42 +119,23 @@ class Settings: ) -# Module-level vars: no validation on import; entry point must check BOT_TOKEN when needed. -BOT_TOKEN = os.getenv("BOT_TOKEN") or "" -DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///data/duty_teller.db") -MINI_APP_BASE_URL = os.getenv("MINI_APP_BASE_URL", "").rstrip("/") -HTTP_PORT = int(os.getenv("HTTP_PORT", "8080")) +# Single source of truth: load once at import; entry point must check BOT_TOKEN when needed. +_settings = Settings.from_env() -_raw_allowed = os.getenv("ALLOWED_USERNAMES", "").strip() -ALLOWED_USERNAMES = { - s.strip().lstrip("@").lower() for s in _raw_allowed.split(",") if s.strip() -} - -_raw_admin = os.getenv("ADMIN_USERNAMES", "").strip() -ADMIN_USERNAMES = { - s.strip().lstrip("@").lower() for s in _raw_admin.split(",") if s.strip() -} - -ALLOWED_PHONES = _parse_phone_list(os.getenv("ALLOWED_PHONES", "")) -ADMIN_PHONES = _parse_phone_list(os.getenv("ADMIN_PHONES", "")) - -MINI_APP_SKIP_AUTH = os.getenv("MINI_APP_SKIP_AUTH", "").strip() in ("1", "true", "yes") -INIT_DATA_MAX_AGE_SECONDS = int(os.getenv("INIT_DATA_MAX_AGE_SECONDS", "0")) - -_raw_cors = os.getenv("CORS_ORIGINS", "").strip() -CORS_ORIGINS = ( - [_o.strip() for _o in _raw_cors.split(",") if _o.strip()] - if _raw_cors and _raw_cors != "*" - else ["*"] -) - -EXTERNAL_CALENDAR_ICS_URL = os.getenv("EXTERNAL_CALENDAR_ICS_URL", "").strip() -DUTY_DISPLAY_TZ = ( - os.getenv("DUTY_DISPLAY_TZ", "Europe/Moscow").strip() or "Europe/Moscow" -) -DEFAULT_LANGUAGE = _normalize_default_language( - os.getenv("DEFAULT_LANGUAGE", "en").strip() -) +BOT_TOKEN = _settings.bot_token +DATABASE_URL = _settings.database_url +MINI_APP_BASE_URL = _settings.mini_app_base_url +HTTP_PORT = _settings.http_port +ALLOWED_USERNAMES = _settings.allowed_usernames +ADMIN_USERNAMES = _settings.admin_usernames +ALLOWED_PHONES = _settings.allowed_phones +ADMIN_PHONES = _settings.admin_phones +MINI_APP_SKIP_AUTH = _settings.mini_app_skip_auth +INIT_DATA_MAX_AGE_SECONDS = _settings.init_data_max_age_seconds +CORS_ORIGINS = _settings.cors_origins +EXTERNAL_CALENDAR_ICS_URL = _settings.external_calendar_ics_url +DUTY_DISPLAY_TZ = _settings.duty_display_tz +DEFAULT_LANGUAGE = _settings.default_language def is_admin(username: str) -> bool: diff --git a/duty_teller/db/repository.py b/duty_teller/db/repository.py index afb0fb4..c3775af 100644 --- a/duty_teller/db/repository.py +++ b/duty_teller/db/repository.py @@ -1,12 +1,12 @@ """Repository: get_or_create_user, get_duties, insert_duty, get_current_duty, group_duty_pins.""" import hashlib -import hmac import secrets from datetime import datetime, timedelta, timezone from sqlalchemy.orm import Session +import duty_teller.config as config from duty_teller.db.models import User, Duty, GroupDutyPin, CalendarSubscriptionToken @@ -23,6 +23,22 @@ def get_user_by_telegram_id(session: Session, telegram_user_id: int) -> User | N return session.query(User).filter(User.telegram_user_id == telegram_user_id).first() +def is_admin_for_telegram_user(session: Session, telegram_user_id: int) -> bool: + """Check if the Telegram user is admin (by username or by stored phone). + + Args: + session: DB session. + telegram_user_id: Telegram user id. + + Returns: + True if user is in ADMIN_USERNAMES or their stored phone is in ADMIN_PHONES. + """ + user = get_user_by_telegram_id(session, telegram_user_id) + if not user: + return False + return config.is_admin(user.username or "") or config.is_admin_by_phone(user.phone) + + def get_or_create_user( session: Session, telegram_user_id: int, @@ -279,9 +295,6 @@ def get_user_by_calendar_token(session: Session, token: str) -> User | None: ) if row is None: return None - # Constant-time compare to avoid timing leaks (token_hash is already hashed). - if not hmac.compare_digest(row[0].token_hash, token_hash_val): - return None return row[1] @@ -466,7 +479,10 @@ def set_user_phone( user = session.query(User).filter(User.telegram_user_id == telegram_user_id).first() if not user: return None - user.phone = phone + if phone is None or (isinstance(phone, str) and not phone.strip()): + user.phone = None + else: + user.phone = config.normalize_phone(phone) session.commit() session.refresh(user) return user diff --git a/duty_teller/handlers/commands.py b/duty_teller/handlers/commands.py index b3a8ca4..a0b2f46 100644 --- a/duty_teller/handlers/commands.py +++ b/duty_teller/handlers/commands.py @@ -11,6 +11,7 @@ from duty_teller.db.repository import ( get_or_create_user, set_user_phone, create_calendar_token, + is_admin_for_telegram_user, ) from duty_teller.i18n import get_lang, t from duty_teller.utils.user import build_full_name @@ -150,7 +151,13 @@ async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: t(lang, "help.calendar_link"), t(lang, "help.pin_duty"), ] - if config.is_admin(update.effective_user.username or ""): + + def check_admin() -> bool: + with session_scope(config.DATABASE_URL) as session: + return is_admin_for_telegram_user(session, update.effective_user.id) + + is_admin_user = await asyncio.get_running_loop().run_in_executor(None, check_admin) + if is_admin_user: lines.append(t(lang, "help.import_schedule")) await update.message.reply_text("\n".join(lines)) diff --git a/duty_teller/handlers/errors.py b/duty_teller/handlers/errors.py index bcb1c14..172d570 100644 --- a/duty_teller/handlers/errors.py +++ b/duty_teller/handlers/errors.py @@ -22,6 +22,9 @@ async def error_handler( """ logger.exception("Exception while handling an update") if isinstance(update, Update) and update.effective_message: - user = getattr(update, "effective_user", None) - lang = get_lang(user) if user else config.DEFAULT_LANGUAGE - await update.effective_message.reply_text(t(lang, "errors.generic")) + try: + user = getattr(update, "effective_user", None) + lang = get_lang(user) if user else config.DEFAULT_LANGUAGE + await update.effective_message.reply_text(t(lang, "errors.generic")) + except Exception: + logger.warning("Could not send error reply to user", exc_info=True) diff --git a/duty_teller/handlers/group_duty_pin.py b/duty_teller/handlers/group_duty_pin.py index 5ae27ba..275cfff 100644 --- a/duty_teller/handlers/group_duty_pin.py +++ b/duty_teller/handlers/group_duty_pin.py @@ -190,8 +190,8 @@ async def restore_group_pin_jobs(application) -> None: """Restore scheduled pin-update jobs for all chats that have a pinned message (on startup).""" loop = asyncio.get_running_loop() chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync) + next_end = await loop.run_in_executor(None, _get_next_shift_end_sync) for chat_id in chat_ids: - next_end = await loop.run_in_executor(None, _get_next_shift_end_sync) await _schedule_next_update(application, chat_id, next_end) logger.info("Restored %s group pin jobs", len(chat_ids)) diff --git a/duty_teller/handlers/import_duty_schedule.py b/duty_teller/handlers/import_duty_schedule.py index a7b3c8e..9ced849 100644 --- a/duty_teller/handlers/import_duty_schedule.py +++ b/duty_teller/handlers/import_duty_schedule.py @@ -7,6 +7,7 @@ from telegram import Update from telegram.ext import CommandHandler, ContextTypes, MessageHandler, filters from duty_teller.db.session import session_scope +from duty_teller.db.repository import is_admin_for_telegram_user from duty_teller.i18n import get_lang, t from duty_teller.importers.duty_schedule import ( DutyScheduleParseError, @@ -23,7 +24,13 @@ async def import_duty_schedule_cmd( if not update.message or not update.effective_user: return lang = get_lang(update.effective_user) - if not config.is_admin(update.effective_user.username or ""): + + def check_admin() -> bool: + with session_scope(config.DATABASE_URL) as session: + return is_admin_for_telegram_user(session, update.effective_user.id) + + is_admin_user = await asyncio.get_running_loop().run_in_executor(None, check_admin) + if not is_admin_user: await update.message.reply_text(t(lang, "import.admin_only")) return context.user_data["awaiting_handover_time"] = True @@ -38,7 +45,13 @@ async def handle_handover_time_text( return if not context.user_data.get("awaiting_handover_time"): return - if not config.is_admin(update.effective_user.username or ""): + + def check_admin() -> bool: + with session_scope(config.DATABASE_URL) as session: + return is_admin_for_telegram_user(session, update.effective_user.id) + + is_admin_user = await asyncio.get_running_loop().run_in_executor(None, check_admin) + if not is_admin_user: return lang = get_lang(update.effective_user) text = update.message.text.strip() @@ -63,7 +76,15 @@ async def handle_duty_schedule_document( return lang = get_lang(update.effective_user) handover = context.user_data.get("handover_utc_time") - if not handover or not config.is_admin(update.effective_user.username or ""): + if not handover: + return + + def check_admin() -> bool: + with session_scope(config.DATABASE_URL) as session: + return is_admin_for_telegram_user(session, update.effective_user.id) + + is_admin_user = await asyncio.get_running_loop().run_in_executor(None, check_admin) + if not is_admin_user: return if not (update.message.document.file_name or "").lower().endswith(".json"): await update.message.reply_text(t(lang, "import.need_json")) diff --git a/duty_teller/run.py b/duty_teller/run.py index 4003ee0..ab31900 100644 --- a/duty_teller/run.py +++ b/duty_teller/run.py @@ -62,6 +62,7 @@ def _run_uvicorn(web_app, port: int) -> None: def main() -> None: """Build the bot and FastAPI, start uvicorn in a thread, run polling.""" require_bot_token() + # Optional: set bot menu button to open the Miniapp. Uncomment to enable: # _set_default_menu_button_webapp() app = ( ApplicationBuilder() diff --git a/duty_teller/services/group_duty_pin_service.py b/duty_teller/services/group_duty_pin_service.py index c28a8f0..6f914ce 100644 --- a/duty_teller/services/group_duty_pin_service.py +++ b/duty_teller/services/group_duty_pin_service.py @@ -1,7 +1,7 @@ """Group duty pin: current duty message text, next shift end, pin CRUD. All accept session.""" from datetime import datetime, timezone -from zoneinfo import ZoneInfo +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from sqlalchemy.orm import Session @@ -32,7 +32,7 @@ def format_duty_message(duty, user, tz_name: str, lang: str = "en") -> str: return t(lang, "duty.no_duty") try: tz = ZoneInfo(tz_name) - except Exception: + except ZoneInfoNotFoundError: tz = ZoneInfo("Europe/Moscow") tz_name = "Europe/Moscow" start_dt = datetime.fromisoformat(duty.start_at.replace("Z", "+00:00"))