feat: implement role-based access control for miniapp
All checks were successful
CI / lint-and-test (push) Successful in 22s

- Introduced a new roles table in the database to manage user roles ('user' and 'admin') for access control.
- Updated the user model to include a foreign key reference to the roles table, allowing for role assignment.
- Enhanced command handlers to support the `/set_role` command for admins to assign roles to users.
- Refactored access control logic to utilize role checks instead of username/phone allowlists, improving security and maintainability.
- Updated documentation to reflect changes in access control mechanisms and role management.
- Added unit tests to ensure correct functionality of role assignment and access checks.
This commit is contained in:
2026-02-20 23:58:54 +03:00
parent d02d0a1835
commit 4824450088
18 changed files with 554 additions and 83 deletions

View File

@@ -1,6 +1,6 @@
"""Database layer: SQLAlchemy models, Pydantic schemas, repository, init."""
from duty_teller.db.models import Base, User, Duty
from duty_teller.db.models import Base, User, Duty, Role
from duty_teller.db.schemas import (
UserCreate,
UserInDb,
@@ -28,6 +28,7 @@ __all__ = [
"Base",
"User",
"Duty",
"Role",
"UserCreate",
"UserInDb",
"DutyCreate",

View File

@@ -10,6 +10,17 @@ class Base(DeclarativeBase):
pass
class Role(Base):
"""Role for access control: 'user' (miniapp access), 'admin' (admin actions)."""
__tablename__ = "roles"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
users: Mapped[list["User"]] = relationship("User", back_populates="role")
class User(Base):
"""Telegram user and display name; may have telegram_user_id=None for import-only users."""
@@ -27,7 +38,11 @@ class User(Base):
name_manually_edited: Mapped[bool] = mapped_column(
Boolean, nullable=False, server_default="0", default=False
)
role_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("roles.id"), nullable=True
)
role: Mapped["Role | None"] = relationship("Role", back_populates="users")
duties: Mapped[list["Duty"]] = relationship("Duty", back_populates="user")

View File

@@ -7,9 +7,19 @@ from datetime import datetime, timezone
from sqlalchemy.orm import Session
import duty_teller.config as config
from duty_teller.db.models import User, Duty, GroupDutyPin, CalendarSubscriptionToken
from duty_teller.db.models import (
User,
Duty,
GroupDutyPin,
CalendarSubscriptionToken,
Role,
)
from duty_teller.utils.dates import parse_utc_iso_naive, to_date_exclusive_iso
# Role names stored in DB (table roles).
ROLE_USER = "user"
ROLE_ADMIN = "admin"
def get_user_by_telegram_id(session: Session, telegram_user_id: int) -> User | None:
"""Find user by Telegram user ID.
@@ -24,22 +34,123 @@ def get_user_by_telegram_id(session: Session, telegram_user_id: int) -> User | N
return session.query(User).filter(User.telegram_user_id == telegram_user_id).first()
def get_user_by_username(session: Session, username: str) -> User | None:
"""Find user by Telegram username (case-insensitive, optional @ prefix).
Args:
session: DB session.
username: Telegram username with or without @.
Returns:
User or None if not found.
"""
from sqlalchemy import func
name = (username or "").strip().lstrip("@").lower()
if not name:
return None
return session.query(User).filter(func.lower(User.username) == name).first()
def get_user_role(session: Session, user_id: int) -> str | None:
"""Return role name for user by internal user id, or None if no role.
Args:
session: DB session.
user_id: Internal user id (users.id).
Returns:
Role name ('user' or 'admin') or None.
"""
user = session.get(User, user_id)
if not user or not user.role:
return None
return user.role.name
def is_admin_for_telegram_user(session: Session, telegram_user_id: int) -> bool:
"""Check if the Telegram user is admin (by username or by stored phone).
"""Check if the Telegram user is admin.
If user has a role in DB, returns True only for role 'admin'.
If user has no role in DB, fallback: True if in ADMIN_USERNAMES or ADMIN_PHONES.
Args:
session: DB session.
telegram_user_id: Telegram user id.
Returns:
True if user is in ADMIN_USERNAMES or their stored phone is in ADMIN_PHONES.
True if admin (by DB role or env fallback).
"""
user = get_user_by_telegram_id(session, telegram_user_id)
if not user:
return False
if user.role is not None:
return user.role.name == ROLE_ADMIN
return config.is_admin(user.username or "") or config.is_admin_by_phone(user.phone)
def can_access_miniapp_for_telegram_user(
session: Session, telegram_user_id: int
) -> bool:
"""Check if Telegram user can access the calendar miniapp.
Access if: user has role 'user' or 'admin' in DB, or (no role in DB and
env fallback: in ADMIN_USERNAMES or ADMIN_PHONES). No user in DB -> no access.
Args:
session: DB session.
telegram_user_id: Telegram user id.
Returns:
True if user may open the miniapp.
"""
user = get_user_by_telegram_id(session, telegram_user_id)
return can_access_miniapp_for_user(session, user) if user else False
def can_access_miniapp_for_user(session: Session, user: User | None) -> bool:
"""Check if user (already loaded) can access the calendar miniapp.
Access if: user has role 'user' or 'admin' in DB, or (no role in DB and
env fallback: in ADMIN_USERNAMES or ADMIN_PHONES).
Args:
session: DB session (unused; kept for API consistency).
user: User instance or None.
Returns:
True if user may open the miniapp.
"""
if not user:
return False
if user.role is not None:
return user.role.name in (ROLE_USER, ROLE_ADMIN)
return config.is_admin(user.username or "") or config.is_admin_by_phone(user.phone)
def set_user_role(session: Session, user_id: int, role_name: str) -> User | None:
"""Set user role by internal user id and role name.
Args:
session: DB session.
user_id: Internal user id (users.id).
role_name: 'user' or 'admin'.
Returns:
Updated User or None if user or role not found.
"""
user = session.get(User, user_id)
if not user:
return None
role = session.query(Role).filter(Role.name == role_name).first()
if not role:
return None
user.role_id = role.id
session.commit()
session.refresh(user)
return user
def get_or_create_user(
session: Session,
telegram_user_id: int,