"""Shared handler helpers (e.g. async admin check).""" import asyncio import duty_teller.config as config from duty_teller.cache import is_admin_cache from duty_teller.db.repository import is_admin_for_telegram_user from duty_teller.db.session import session_scope async def is_admin_async(telegram_user_id: int) -> bool: """Check if Telegram user is admin. Cached 60s. Invalidated on set_user_role. Args: telegram_user_id: Telegram user id. Returns: True if user is admin (DB role or env fallback). """ cache_key = ("is_admin", telegram_user_id) value, found = is_admin_cache.get(cache_key) if found: return value def _check() -> bool: with session_scope(config.DATABASE_URL) as session: return is_admin_for_telegram_user(session, telegram_user_id) result = await asyncio.get_running_loop().run_in_executor(None, _check) is_admin_cache.set(cache_key, result) return result def invalidate_is_admin_cache(telegram_user_id: int | None) -> None: """Invalidate is_admin cache for user. Call after set_user_role.""" if telegram_user_id is not None: is_admin_cache.invalidate(("is_admin", telegram_user_id))