From f8aceabab5e3ea4d61ff7795d248f898c0afaddf Mon Sep 17 00:00:00 2001 From: Nikolay Tatarinov Date: Mon, 2 Mar 2026 13:07:13 +0300 Subject: [PATCH] feat: add trusted groups functionality for duty information - Introduced a new `trusted_groups` table to store groups authorized to receive duty information. - Implemented functions to add, remove, and check trusted groups in the database. - Enhanced command handlers to manage trusted groups, including `/trust_group` and `/untrust_group` commands for admin users. - Updated internationalization messages to support new commands and group status notifications. - Added unit tests for trusted groups repository functions to ensure correct behavior and data integrity. --- alembic/versions/009_trusted_groups.py | 32 ++ duty_teller/db/models.py | 10 + duty_teller/db/repository.py | 66 +++ duty_teller/handlers/__init__.py | 2 + duty_teller/handlers/commands.py | 2 + duty_teller/handlers/group_duty_pin.py | 171 +++++- duty_teller/i18n/messages.py | 26 + .../services/group_duty_pin_service.py | 39 ++ tests/test_handlers_group_duty_pin.py | 517 ++++++++++++++---- tests/test_trusted_groups_repository.py | 84 +++ 10 files changed, 837 insertions(+), 112 deletions(-) create mode 100644 alembic/versions/009_trusted_groups.py create mode 100644 tests/test_trusted_groups_repository.py diff --git a/alembic/versions/009_trusted_groups.py b/alembic/versions/009_trusted_groups.py new file mode 100644 index 0000000..8ea4005 --- /dev/null +++ b/alembic/versions/009_trusted_groups.py @@ -0,0 +1,32 @@ +"""Add trusted_groups table. + +Revision ID: 009 +Revises: 008 +Create Date: 2025-03-02 + +Table for groups authorized to receive duty information. +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "009" +down_revision: Union[str, None] = "008" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "trusted_groups", + sa.Column("chat_id", sa.BigInteger(), nullable=False), + sa.Column("added_by_user_id", sa.BigInteger(), nullable=True), + sa.Column("added_at", sa.Text(), nullable=False), + sa.PrimaryKeyConstraint("chat_id"), + ) + + +def downgrade() -> None: + op.drop_table("trusted_groups") diff --git a/duty_teller/db/models.py b/duty_teller/db/models.py index f2357f3..566003e 100644 --- a/duty_teller/db/models.py +++ b/duty_teller/db/models.py @@ -84,3 +84,13 @@ class GroupDutyPin(Base): chat_id: Mapped[int] = mapped_column(BigInteger, primary_key=True) message_id: Mapped[int] = mapped_column(Integer, nullable=False) + + +class TrustedGroup(Base): + """Groups authorized to receive duty information.""" + + __tablename__ = "trusted_groups" + + chat_id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + added_by_user_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True) + added_at: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/duty_teller/db/repository.py b/duty_teller/db/repository.py index 83884e8..2dee3d3 100644 --- a/duty_teller/db/repository.py +++ b/duty_teller/db/repository.py @@ -11,6 +11,7 @@ from duty_teller.db.models import ( User, Duty, GroupDutyPin, + TrustedGroup, CalendarSubscriptionToken, Role, ) @@ -593,6 +594,71 @@ def get_all_group_duty_pin_chat_ids(session: Session) -> list[int]: return [r[0] for r in rows] +def is_trusted_group(session: Session, chat_id: int) -> bool: + """Check if the chat is in the trusted groups list. + + Args: + session: DB session. + chat_id: Telegram chat id. + + Returns: + True if the group is trusted. + """ + return ( + session.query(TrustedGroup).filter(TrustedGroup.chat_id == chat_id).first() + is not None + ) + + +def add_trusted_group( + session: Session, chat_id: int, added_by_user_id: int | None = None +) -> TrustedGroup: + """Add a group to the trusted list. + + Args: + session: DB session. + chat_id: Telegram chat id. + added_by_user_id: Telegram user id of the admin who added the group (optional). + + Returns: + Created TrustedGroup instance. + """ + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + record = TrustedGroup( + chat_id=chat_id, + added_by_user_id=added_by_user_id, + added_at=now_iso, + ) + session.add(record) + session.commit() + session.refresh(record) + return record + + +def remove_trusted_group(session: Session, chat_id: int) -> None: + """Remove a group from the trusted list. + + Args: + session: DB session. + chat_id: Telegram chat id. + """ + session.query(TrustedGroup).filter(TrustedGroup.chat_id == chat_id).delete() + session.commit() + + +def get_all_trusted_group_ids(session: Session) -> list[int]: + """Return all chat_ids that are trusted. + + Args: + session: DB session. + + Returns: + List of trusted chat ids. + """ + rows = session.query(TrustedGroup.chat_id).all() + return [r[0] for r in rows] + + def set_user_phone( session: Session, telegram_user_id: int, phone: str | None ) -> User | None: diff --git a/duty_teller/handlers/__init__.py b/duty_teller/handlers/__init__.py index 7a12728..a9b23bb 100644 --- a/duty_teller/handlers/__init__.py +++ b/duty_teller/handlers/__init__.py @@ -22,4 +22,6 @@ def register_handlers(app: Application) -> None: app.add_handler(group_duty_pin.group_duty_pin_handler) app.add_handler(group_duty_pin.pin_duty_handler) app.add_handler(group_duty_pin.refresh_pin_handler) + app.add_handler(group_duty_pin.trust_group_handler) + app.add_handler(group_duty_pin.untrust_group_handler) app.add_error_handler(errors.error_handler) diff --git a/duty_teller/handlers/commands.py b/duty_teller/handlers/commands.py index 6533058..72b86a5 100644 --- a/duty_teller/handlers/commands.py +++ b/duty_teller/handlers/commands.py @@ -168,6 +168,8 @@ async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if await is_admin_async(update.effective_user.id): lines.append(t(lang, "help.import_schedule")) lines.append(t(lang, "help.set_role")) + lines.append(t(lang, "help.trust_group")) + lines.append(t(lang, "help.untrust_group")) await update.message.reply_text("\n".join(lines)) diff --git a/duty_teller/handlers/group_duty_pin.py b/duty_teller/handlers/group_duty_pin.py index f4c2f47..c53da0e 100644 --- a/duty_teller/handlers/group_duty_pin.py +++ b/duty_teller/handlers/group_duty_pin.py @@ -13,6 +13,7 @@ from telegram.ext import ChatMemberHandler, CommandHandler, ContextTypes from duty_teller.db.session import session_scope from duty_teller.i18n import get_lang, t +from duty_teller.handlers.common import is_admin_async from duty_teller.services.group_duty_pin_service import ( get_duty_message_text, get_message_id, @@ -21,6 +22,9 @@ from duty_teller.services.group_duty_pin_service import ( save_pin, delete_pin, get_all_pin_chat_ids, + is_group_trusted, + trust_group, + untrust_group, ) logger = logging.getLogger(__name__) @@ -62,6 +66,37 @@ def _sync_get_message_id(chat_id: int) -> int | None: return get_message_id(session, chat_id) +def _sync_is_trusted(chat_id: int) -> bool: + """Check if the group is trusted (sync wrapper for handlers).""" + with session_scope(config.DATABASE_URL) as session: + return is_group_trusted(session, chat_id) + + +def _sync_trust_group(chat_id: int, added_by_user_id: int | None) -> bool: + """Add group to trusted list. Returns True if already trusted (no-op).""" + with session_scope(config.DATABASE_URL) as session: + if is_group_trusted(session, chat_id): + return True + trust_group(session, chat_id, added_by_user_id) + return False + + +def _sync_untrust_group(chat_id: int) -> tuple[bool, int | None]: + """Remove group from trusted list. + + Returns: + (was_trusted, message_id): was_trusted False if group was not in list; + message_id of pinned message if any (for cleanup), else None. + """ + with session_scope(config.DATABASE_URL) as session: + if not is_group_trusted(session, chat_id): + return (False, None) + message_id = get_message_id(session, chat_id) + delete_pin(session, chat_id) + untrust_group(session, chat_id) + return (True, message_id) + + async def _schedule_next_update( application, chat_id: int, when_utc: datetime | None ) -> None: @@ -102,17 +137,40 @@ async def _schedule_next_update( async def _refresh_pin_for_chat( context: ContextTypes.DEFAULT_TYPE, chat_id: int -) -> Literal["updated", "no_message", "failed"]: +) -> Literal["updated", "no_message", "failed", "untrusted"]: """Refresh pinned duty message: send new message, unpin old, pin new, save new message_id, delete old. Uses single DB session for message_id, text, next_shift_end (consolidated). + If the group is no longer trusted, removes pin record, job, and message; returns "untrusted". Returns: "updated" if the message was sent, pinned and saved successfully; "no_message" if there is no pin record for this chat; - "failed" if send_message or permissions failed. + "failed" if send_message or permissions failed; + "untrusted" if the group was removed from trusted list (pin record and message cleaned up). """ loop = asyncio.get_running_loop() + trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id) + if not trusted: + old_message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id) + await loop.run_in_executor(None, _sync_delete_pin, chat_id) + name = f"{JOB_NAME_PREFIX}{chat_id}" + if context.application.job_queue: + for job in context.application.job_queue.get_jobs_by_name(name): + job.schedule_removal() + if old_message_id is not None: + try: + await context.bot.unpin_chat_message(chat_id=chat_id) + except (BadRequest, Forbidden): + pass + try: + await context.bot.delete_message( + chat_id=chat_id, message_id=old_message_id + ) + except (BadRequest, Forbidden): + pass + logger.info("Chat_id=%s no longer trusted, removed pin record and job", chat_id) + return "untrusted" message_id, text, next_end = await loop.run_in_executor( None, lambda: _sync_get_pin_refresh_data(chat_id, config.DEFAULT_LANGUAGE), @@ -186,6 +244,17 @@ async def my_chat_member_handler( ChatMemberStatus.BANNED, ): loop = asyncio.get_running_loop() + trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id) + if not trusted: + lang = get_lang(update.effective_user) + try: + await context.bot.send_message( + chat_id=chat_id, + text=t(lang, "group.not_trusted"), + ) + except (BadRequest, Forbidden): + pass + return lang = get_lang(update.effective_user) text = await loop.run_in_executor( None, lambda: _get_duty_message_text_sync(lang) @@ -255,6 +324,10 @@ async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No return chat_id = chat.id loop = asyncio.get_running_loop() + trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id) + if not trusted: + await update.message.reply_text(t(lang, "group.not_trusted")) + return message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id) if message_id is None: text = await loop.run_in_executor( @@ -312,13 +385,107 @@ async def refresh_pin_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> await update.message.reply_text(t(lang, "refresh_pin.group_only")) return chat_id = chat.id + loop = asyncio.get_running_loop() + trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id) + if not trusted: + await update.message.reply_text(t(lang, "group.not_trusted")) + return result = await _refresh_pin_for_chat(context, chat_id) await update.message.reply_text(t(lang, f"refresh_pin.{result}")) +async def trust_group_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle /trust_group: add current group to trusted list (admin only).""" + if not update.message or not update.effective_chat or not update.effective_user: + return + chat = update.effective_chat + lang = get_lang(update.effective_user) + if chat.type not in ("group", "supergroup"): + await update.message.reply_text(t(lang, "trust_group.group_only")) + return + if not await is_admin_async(update.effective_user.id): + await update.message.reply_text(t(lang, "import.admin_only")) + return + chat_id = chat.id + loop = asyncio.get_running_loop() + already_trusted = await loop.run_in_executor( + None, + lambda: _sync_trust_group(chat_id, update.effective_user.id if update.effective_user else None), + ) + if already_trusted: + await update.message.reply_text(t(lang, "trust_group.already_trusted")) + return + await update.message.reply_text(t(lang, "trust_group.added")) + message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id) + if message_id is None: + text = await loop.run_in_executor( + None, lambda: _get_duty_message_text_sync(lang) + ) + try: + msg = await context.bot.send_message(chat_id=chat_id, text=text) + except (BadRequest, Forbidden) as e: + logger.warning( + "Failed to send duty message after trust_group chat_id=%s: %s", + chat_id, + e, + ) + return + try: + await context.bot.pin_chat_message( + chat_id=chat_id, + message_id=msg.message_id, + disable_notification=not config.DUTY_PIN_NOTIFY, + ) + except (BadRequest, Forbidden) as e: + logger.warning( + "Failed to pin message after trust_group chat_id=%s: %s", chat_id, e + ) + await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id) + next_end = await loop.run_in_executor(None, _get_next_shift_end_sync) + await _schedule_next_update(context.application, chat_id, next_end) + + +async def untrust_group_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle /untrust_group: remove current group from trusted list (admin only).""" + if not update.message or not update.effective_chat or not update.effective_user: + return + chat = update.effective_chat + lang = get_lang(update.effective_user) + if chat.type not in ("group", "supergroup"): + await update.message.reply_text(t(lang, "untrust_group.group_only")) + return + if not await is_admin_async(update.effective_user.id): + await update.message.reply_text(t(lang, "import.admin_only")) + return + chat_id = chat.id + loop = asyncio.get_running_loop() + was_trusted, message_id = await loop.run_in_executor( + None, _sync_untrust_group, chat_id + ) + if not was_trusted: + await update.message.reply_text(t(lang, "untrust_group.not_trusted")) + return + name = f"{JOB_NAME_PREFIX}{chat_id}" + if context.application.job_queue: + for job in context.application.job_queue.get_jobs_by_name(name): + job.schedule_removal() + if message_id is not None: + try: + await context.bot.unpin_chat_message(chat_id=chat_id) + except (BadRequest, Forbidden): + pass + try: + await context.bot.delete_message(chat_id=chat_id, message_id=message_id) + except (BadRequest, Forbidden): + pass + await update.message.reply_text(t(lang, "untrust_group.removed")) + + group_duty_pin_handler = ChatMemberHandler( my_chat_member_handler, ChatMemberHandler.MY_CHAT_MEMBER, ) pin_duty_handler = CommandHandler("pin_duty", pin_duty_cmd) refresh_pin_handler = CommandHandler("refresh_pin", refresh_pin_cmd) +trust_group_handler = CommandHandler("trust_group", trust_group_cmd) +untrust_group_handler = CommandHandler("untrust_group", untrust_group_cmd) diff --git a/duty_teller/i18n/messages.py b/duty_teller/i18n/messages.py index bee5409..e14a566 100644 --- a/duty_teller/i18n/messages.py +++ b/duty_teller/i18n/messages.py @@ -18,6 +18,19 @@ MESSAGES: dict[str, dict[str, str]] = { "refresh_pin.no_message": "There is no pinned duty message to refresh in this chat.", "refresh_pin.updated": "Pinned duty message updated.", "refresh_pin.failed": "Could not update the pinned message (permissions or edit error).", + "refresh_pin.untrusted": "Group was removed from trusted list; pin record cleared.", + "trust_group.added": "Group added to trusted list.", + "trust_group.already_trusted": "This group is already trusted.", + "trust_group.group_only": "The /trust_group command works only in groups.", + "untrust_group.removed": "Group removed from trusted list.", + "untrust_group.not_trusted": "This group is not in the trusted list.", + "untrust_group.group_only": "The /untrust_group command works only in groups.", + "group.not_trusted": ( + "This group is not authorized to receive duty data. " + "An administrator can add the group with /trust_group." + ), + "help.trust_group": "/trust_group — In a group: add group to trusted list (admin only)", + "help.untrust_group": "/untrust_group — In a group: remove group from trusted list (admin only)", "calendar_link.private_only": "The /calendar_link command is only available in private chat.", "calendar_link.access_denied": "Access denied.", "calendar_link.success": ( @@ -92,6 +105,19 @@ MESSAGES: dict[str, dict[str, str]] = { "refresh_pin.no_message": "В этом чате нет закреплённого сообщения о дежурстве для обновления.", "refresh_pin.updated": "Закреплённое сообщение о дежурстве обновлено.", "refresh_pin.failed": "Не удалось обновить закреплённое сообщение (права или ошибка редактирования).", + "refresh_pin.untrusted": "Группа удалена из доверенных; запись о закреплении сброшена.", + "trust_group.added": "Группа добавлена в доверенные.", + "trust_group.already_trusted": "Эта группа уже в доверенных.", + "trust_group.group_only": "Команда /trust_group работает только в группах.", + "untrust_group.removed": "Группа удалена из доверенных.", + "untrust_group.not_trusted": "Эта группа не в доверенных.", + "untrust_group.group_only": "Команда /untrust_group работает только в группах.", + "group.not_trusted": ( + "Эта группа не авторизована для получения данных дежурных. " + "Администратор может добавить группу командой /trust_group." + ), + "help.trust_group": "/trust_group — В группе: добавить группу в доверенные (только админ)", + "help.untrust_group": "/untrust_group — В группе: удалить группу из доверенных (только админ)", "calendar_link.private_only": "Команда /calendar_link доступна только в личке.", "calendar_link.access_denied": "Доступ запрещён.", "calendar_link.success": ( diff --git a/duty_teller/services/group_duty_pin_service.py b/duty_teller/services/group_duty_pin_service.py index 42f914f..b824660 100644 --- a/duty_teller/services/group_duty_pin_service.py +++ b/duty_teller/services/group_duty_pin_service.py @@ -13,6 +13,9 @@ from duty_teller.db.repository import ( save_group_duty_pin, delete_group_duty_pin, get_all_group_duty_pin_chat_ids, + is_trusted_group, + add_trusted_group, + remove_trusted_group, ) from duty_teller.i18n import t from duty_teller.utils.dates import parse_utc_iso @@ -164,3 +167,39 @@ def get_all_pin_chat_ids(session: Session) -> list[int]: List of chat ids. """ return get_all_group_duty_pin_chat_ids(session) + + +def is_group_trusted(session: Session, chat_id: int) -> bool: + """Check if the group is in the trusted list. + + Args: + session: DB session. + chat_id: Telegram chat id. + + Returns: + True if the group is trusted. + """ + return is_trusted_group(session, chat_id) + + +def trust_group( + session: Session, chat_id: int, added_by_user_id: int | None = None +) -> None: + """Add the group to the trusted list. + + Args: + session: DB session. + chat_id: Telegram chat id. + added_by_user_id: Telegram user id of the admin who added the group (optional). + """ + add_trusted_group(session, chat_id, added_by_user_id) + + +def untrust_group(session: Session, chat_id: int) -> None: + """Remove the group from the trusted list. + + Args: + session: DB session. + chat_id: Telegram chat id. + """ + remove_trusted_group(session, chat_id) diff --git a/tests/test_handlers_group_duty_pin.py b/tests/test_handlers_group_duty_pin.py index cd0abd8..ccfb7ae 100644 --- a/tests/test_handlers_group_duty_pin.py +++ b/tests/test_handlers_group_duty_pin.py @@ -144,12 +144,13 @@ async def test_update_group_pin_sends_new_unpins_pins_saves_schedules_next(): context.application.job_queue.run_once = MagicMock() with patch.object(config, "DUTY_PIN_NOTIFY", True): - with patch.object( - mod, "_sync_get_pin_refresh_data", return_value=(1, "Current duty", None) - ): - with patch.object(mod, "_schedule_next_update", AsyncMock()): - with patch.object(mod, "_sync_save_pin") as mock_save: - await mod.update_group_pin(context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_sync_get_pin_refresh_data", return_value=(1, "Current duty", None) + ): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + with patch.object(mod, "_sync_save_pin") as mock_save: + await mod.update_group_pin(context) context.bot.send_message.assert_called_once_with(chat_id=123, text="Current duty") context.bot.unpin_chat_message.assert_called_once_with(chat_id=123) context.bot.pin_chat_message.assert_called_once_with( @@ -178,13 +179,14 @@ async def test_update_group_pin_delete_message_raises_bad_request_still_schedule context.application.job_queue.run_once = MagicMock() with patch.object(config, "DUTY_PIN_NOTIFY", True): - with patch.object( - mod, "_sync_get_pin_refresh_data", return_value=(1, "Current duty", None) - ): - with patch.object(mod, "_schedule_next_update", AsyncMock()) as mock_schedule: - with patch.object(mod, "_sync_save_pin") as mock_save: - with patch.object(mod, "logger") as mock_logger: - await mod.update_group_pin(context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_sync_get_pin_refresh_data", return_value=(1, "Current duty", None) + ): + with patch.object(mod, "_schedule_next_update", AsyncMock()) as mock_schedule: + with patch.object(mod, "_sync_save_pin") as mock_save: + with patch.object(mod, "logger") as mock_logger: + await mod.update_group_pin(context) mock_save.assert_called_once_with(123, 999) mock_schedule.assert_called_once_with(context.application, 123, None) mock_logger.warning.assert_called_once() @@ -200,10 +202,11 @@ async def test_update_group_pin_no_message_id_skips(): context.bot = MagicMock() context.bot.send_message = AsyncMock() - with patch.object( - mod, "_sync_get_pin_refresh_data", return_value=(None, "No duty", None) - ): - await mod.update_group_pin(context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_sync_get_pin_refresh_data", return_value=(None, "No duty", None) + ): + await mod.update_group_pin(context) context.bot.send_message.assert_not_called() @@ -219,11 +222,12 @@ async def test_update_group_pin_send_raises_no_unpin_pin_schedule_still_called() context.bot.pin_chat_message = AsyncMock() context.application = MagicMock() - with patch.object( - mod, "_sync_get_pin_refresh_data", return_value=(2, "Text", None) - ): - with patch.object(mod, "_schedule_next_update", AsyncMock()) as mock_schedule: - await mod.update_group_pin(context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_sync_get_pin_refresh_data", return_value=(2, "Text", None) + ): + with patch.object(mod, "_schedule_next_update", AsyncMock()) as mock_schedule: + await mod.update_group_pin(context) context.bot.unpin_chat_message.assert_not_called() context.bot.pin_chat_message.assert_not_called() mock_schedule.assert_called_once_with(context.application, 111, None) @@ -246,15 +250,16 @@ async def test_update_group_pin_repin_raises_still_schedules_next(): context.application = MagicMock() with patch.object(config, "DUTY_PIN_NOTIFY", True): - with patch.object( - mod, "_sync_get_pin_refresh_data", return_value=(3, "Text", None) - ): + with patch.object(mod, "_sync_is_trusted", return_value=True): with patch.object( - mod, "_schedule_next_update", AsyncMock() - ) as mock_schedule: - with patch.object(mod, "_sync_save_pin") as mock_save: - with patch.object(mod, "logger") as mock_logger: - await mod.update_group_pin(context) + mod, "_sync_get_pin_refresh_data", return_value=(3, "Text", None) + ): + with patch.object( + mod, "_schedule_next_update", AsyncMock() + ) as mock_schedule: + with patch.object(mod, "_sync_save_pin") as mock_save: + with patch.object(mod, "logger") as mock_logger: + await mod.update_group_pin(context) context.bot.send_message.assert_called_once_with(chat_id=222, text="Text") mock_save.assert_not_called() mock_logger.warning.assert_called_once() @@ -278,14 +283,15 @@ async def test_update_group_pin_duty_pin_notify_false_pins_silent(): context.application = MagicMock() with patch.object(config, "DUTY_PIN_NOTIFY", False): - with patch.object( - mod, "_sync_get_pin_refresh_data", return_value=(4, "Text", None) - ): + with patch.object(mod, "_sync_is_trusted", return_value=True): with patch.object( - mod, "_schedule_next_update", AsyncMock() - ) as mock_schedule: - with patch.object(mod, "_sync_save_pin") as mock_save: - await mod.update_group_pin(context) + mod, "_sync_get_pin_refresh_data", return_value=(4, "Text", None) + ): + with patch.object( + mod, "_schedule_next_update", AsyncMock() + ) as mock_schedule: + with patch.object(mod, "_sync_save_pin") as mock_save: + await mod.update_group_pin(context) context.bot.send_message.assert_called_once_with(chat_id=333, text="Text") context.bot.unpin_chat_message.assert_called_once_with(chat_id=333) context.bot.pin_chat_message.assert_called_once_with( @@ -331,16 +337,41 @@ async def test_pin_duty_cmd_group_pins_and_replies_pinned(): context.bot.pin_chat_message = AsyncMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object(mod, "_sync_get_message_id", return_value=5): - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "Pinned" - await mod.pin_duty_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_sync_get_message_id", return_value=5): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Pinned" + await mod.pin_duty_cmd(update, context) context.bot.pin_chat_message.assert_called_once_with( chat_id=100, message_id=5, disable_notification=True ) update.message.reply_text.assert_called_once_with("Pinned") +@pytest.mark.asyncio +async def test_pin_duty_cmd_untrusted_group_rejects(): + """pin_duty_cmd in untrusted group -> reply group.not_trusted, no send/pin.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + context = MagicMock() + context.bot = MagicMock() + context.bot.send_message = AsyncMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_sync_is_trusted", return_value=False): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Not authorized" + await mod.pin_duty_cmd(update, context) + update.message.reply_text.assert_called_once_with("Not authorized") + mock_t.assert_called_with("en", "group.not_trusted") + context.bot.send_message.assert_not_called() + + @pytest.mark.asyncio async def test_pin_duty_cmd_no_message_id_creates_sends_pins_saves_schedules_replies_pinned(): """pin_duty_cmd: no pin record -> send_message, pin, save_pin, schedule, reply pinned.""" @@ -363,20 +394,21 @@ async def test_pin_duty_cmd_no_message_id_creates_sends_pins_saves_schedules_rep context.application.job_queue.run_once = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object(mod, "_sync_get_message_id", return_value=None): - with patch.object( - mod, "_get_duty_message_text_sync", return_value="Duty text" - ): - with patch.object(mod, "_sync_save_pin") as mock_save: - with patch.object( - mod, "_get_next_shift_end_sync", return_value=None - ): - with patch.object(mod, "_schedule_next_update", AsyncMock()): - with patch( - "duty_teller.handlers.group_duty_pin.t" - ) as mock_t: - mock_t.return_value = "Pinned" - await mod.pin_duty_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_sync_get_message_id", return_value=None): + with patch.object( + mod, "_get_duty_message_text_sync", return_value="Duty text" + ): + with patch.object(mod, "_sync_save_pin") as mock_save: + with patch.object( + mod, "_get_next_shift_end_sync", return_value=None + ): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + with patch( + "duty_teller.handlers.group_duty_pin.t" + ) as mock_t: + mock_t.return_value = "Pinned" + await mod.pin_duty_cmd(update, context) context.bot.send_message.assert_called_once_with(chat_id=100, text="Duty text") context.bot.pin_chat_message.assert_called_once_with( chat_id=100, message_id=42, disable_notification=True @@ -402,15 +434,16 @@ async def test_pin_duty_cmd_no_message_id_send_message_raises_replies_failed(): context.application = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object(mod, "_sync_get_message_id", return_value=None): - with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty"): - with patch.object(mod, "_sync_save_pin") as mock_save: - with patch.object( - mod, "_schedule_next_update", AsyncMock() - ) as mock_schedule: - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "Failed" - await mod.pin_duty_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_sync_get_message_id", return_value=None): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty"): + with patch.object(mod, "_sync_save_pin") as mock_save: + with patch.object( + mod, "_schedule_next_update", AsyncMock() + ) as mock_schedule: + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Failed" + await mod.pin_duty_cmd(update, context) update.message.reply_text.assert_called_once_with("Failed") mock_t.assert_called_with("en", "pin_duty.failed") mock_save.assert_not_called() @@ -439,18 +472,19 @@ async def test_pin_duty_cmd_no_message_id_pin_raises_saves_and_replies_could_not context.application.job_queue.run_once = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object(mod, "_sync_get_message_id", return_value=None): - with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty"): - with patch.object(mod, "_sync_save_pin") as mock_save: - with patch.object( - mod, "_get_next_shift_end_sync", return_value=None - ): - with patch.object(mod, "_schedule_next_update", AsyncMock()): - with patch( - "duty_teller.handlers.group_duty_pin.t" - ) as mock_t: - mock_t.return_value = "Make me admin to pin" - await mod.pin_duty_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_sync_get_message_id", return_value=None): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty"): + with patch.object(mod, "_sync_save_pin") as mock_save: + with patch.object( + mod, "_get_next_shift_end_sync", return_value=None + ): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + with patch( + "duty_teller.handlers.group_duty_pin.t" + ) as mock_t: + mock_t.return_value = "Make me admin to pin" + await mod.pin_duty_cmd(update, context) context.bot.send_message.assert_called_once_with(chat_id=100, text="Duty") mock_save.assert_called_once_with(100, 43) update.message.reply_text.assert_called_once_with("Make me admin to pin") @@ -474,10 +508,11 @@ async def test_pin_duty_cmd_pin_raises_replies_failed(): ) with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object(mod, "_sync_get_message_id", return_value=5): - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "Failed to pin" - await mod.pin_duty_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_sync_get_message_id", return_value=5): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Failed to pin" + await mod.pin_duty_cmd(update, context) update.message.reply_text.assert_called_once_with("Failed to pin") mock_t.assert_called_with("en", "pin_duty.failed") @@ -518,12 +553,13 @@ async def test_refresh_pin_cmd_group_updated_replies_updated(): context = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object( - mod, "_refresh_pin_for_chat", AsyncMock(return_value="updated") - ): - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "Updated" - await mod.refresh_pin_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_refresh_pin_for_chat", AsyncMock(return_value="updated") + ): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Updated" + await mod.refresh_pin_cmd(update, context) update.message.reply_text.assert_called_once_with("Updated") mock_t.assert_called_with("en", "refresh_pin.updated") @@ -541,12 +577,13 @@ async def test_refresh_pin_cmd_group_no_message_replies_no_message(): context = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object( - mod, "_refresh_pin_for_chat", AsyncMock(return_value="no_message") - ): - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "No message" - await mod.refresh_pin_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_refresh_pin_for_chat", AsyncMock(return_value="no_message") + ): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "No message" + await mod.refresh_pin_cmd(update, context) update.message.reply_text.assert_called_once_with("No message") mock_t.assert_called_with("en", "refresh_pin.no_message") @@ -564,16 +601,40 @@ async def test_refresh_pin_cmd_group_edit_raises_replies_failed(): context = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object( - mod, "_refresh_pin_for_chat", AsyncMock(return_value="failed") - ): - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "Failed" - await mod.refresh_pin_cmd(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object( + mod, "_refresh_pin_for_chat", AsyncMock(return_value="failed") + ): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Failed" + await mod.refresh_pin_cmd(update, context) update.message.reply_text.assert_called_once_with("Failed") mock_t.assert_called_with("en", "refresh_pin.failed") +@pytest.mark.asyncio +async def test_refresh_pin_cmd_untrusted_group_rejects(): + """refresh_pin_cmd in untrusted group -> reply group.not_trusted, _refresh_pin_for_chat not called.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + context = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_sync_is_trusted", return_value=False): + with patch.object(mod, "_refresh_pin_for_chat", AsyncMock()) as mock_refresh: + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Not authorized" + await mod.refresh_pin_cmd(update, context) + update.message.reply_text.assert_called_once_with("Not authorized") + mock_t.assert_called_with("en", "group.not_trusted") + mock_refresh.assert_not_called() + + # --- my_chat_member_handler --- @@ -622,11 +683,67 @@ async def test_my_chat_member_handler_bot_added_sends_pins_and_schedules(): context.application.job_queue.run_once = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty text"): - with patch.object(mod, "_sync_save_pin"): - with patch.object(mod, "_get_next_shift_end_sync", return_value=None): - with patch.object(mod, "_schedule_next_update", AsyncMock()): - await mod.my_chat_member_handler(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty text"): + with patch.object(mod, "_sync_save_pin"): + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + await mod.my_chat_member_handler(update, context) + context.bot.send_message.assert_called_once_with(chat_id=200, text="Duty text") + context.bot.pin_chat_message.assert_called_once_with( + chat_id=200, message_id=42, disable_notification=True + ) + + +@pytest.mark.asyncio +async def test_my_chat_member_handler_untrusted_group_does_not_send_duty(): + """my_chat_member_handler: bot added to untrusted group -> send group.not_trusted only, no duty message/pin/schedule.""" + update = _make_my_chat_member_update( + old_status=ChatMemberStatus.LEFT, + new_status=ChatMemberStatus.ADMINISTRATOR, + chat_id=200, + bot_id=999, + ) + context = MagicMock() + context.bot = MagicMock() + context.bot.id = 999 + context.bot.send_message = AsyncMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_sync_is_trusted", return_value=False): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Not authorized" + await mod.my_chat_member_handler(update, context) + context.bot.send_message.assert_called_once_with(chat_id=200, text="Not authorized") + mock_t.assert_called_with("en", "group.not_trusted") + + +@pytest.mark.asyncio +async def test_my_chat_member_handler_trusted_group_sends_duty(): + """my_chat_member_handler: bot added to trusted group -> send duty, pin, schedule (same as test_my_chat_member_handler_bot_added_sends_pins_and_schedules).""" + update = _make_my_chat_member_update( + old_status=ChatMemberStatus.LEFT, + new_status=ChatMemberStatus.ADMINISTRATOR, + chat_id=200, + bot_id=999, + ) + context = MagicMock() + context.bot = MagicMock() + context.bot.id = 999 + context.bot.send_message = AsyncMock(return_value=MagicMock(message_id=42)) + context.bot.pin_chat_message = AsyncMock() + context.application = MagicMock() + context.application.job_queue = MagicMock() + context.application.job_queue.get_jobs_by_name = MagicMock(return_value=[]) + context.application.job_queue.run_once = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty text"): + with patch.object(mod, "_sync_save_pin"): + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + await mod.my_chat_member_handler(update, context) context.bot.send_message.assert_called_once_with(chat_id=200, text="Duty text") context.bot.pin_chat_message.assert_called_once_with( chat_id=200, message_id=42, disable_notification=True @@ -653,13 +770,14 @@ async def test_my_chat_member_handler_pin_raises_sends_could_not_pin(): context.application.job_queue.run_once = MagicMock() with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): - with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty"): - with patch.object(mod, "_sync_save_pin"): - with patch.object(mod, "_get_next_shift_end_sync", return_value=None): - with patch.object(mod, "_schedule_next_update", AsyncMock()): - with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: - mock_t.return_value = "Make me admin to pin" - await mod.my_chat_member_handler(update, context) + with patch.object(mod, "_sync_is_trusted", return_value=True): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty"): + with patch.object(mod, "_sync_save_pin"): + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Make me admin to pin" + await mod.my_chat_member_handler(update, context) assert context.bot.send_message.call_count >= 2 pin_hint_calls = [ c @@ -710,3 +828,182 @@ async def test_restore_group_pin_jobs_calls_schedule_for_each_chat(): assert mock_schedule.call_count == 2 mock_schedule.assert_any_call(application, 10, None) mock_schedule.assert_any_call(application, 20, None) + + +# --- _refresh_pin_for_chat untrusted --- + + +@pytest.mark.asyncio +async def test_refresh_pin_for_chat_untrusted_removes_pin(): + """_refresh_pin_for_chat: when group not trusted -> delete_pin, remove job, unpin/delete message, return untrusted.""" + context = MagicMock() + context.bot = MagicMock() + context.bot.unpin_chat_message = AsyncMock() + context.bot.delete_message = AsyncMock() + context.application = MagicMock() + context.application.job_queue = MagicMock() + mock_job = MagicMock() + context.application.job_queue.get_jobs_by_name = MagicMock(return_value=[mock_job]) + + with patch.object(mod, "_sync_is_trusted", return_value=False): + with patch.object(mod, "_sync_get_message_id", return_value=11): + with patch.object(mod, "_sync_delete_pin") as mock_delete_pin: + result = await mod._refresh_pin_for_chat(context, 100) + assert result == "untrusted" + mock_delete_pin.assert_called_once_with(100) + context.application.job_queue.get_jobs_by_name.assert_called_once_with("duty_pin_100") + mock_job.schedule_removal.assert_called_once() + context.bot.unpin_chat_message.assert_called_once_with(chat_id=100) + context.bot.delete_message.assert_called_once_with(chat_id=100, message_id=11) + + +# --- trust_group_cmd / untrust_group_cmd --- + + +@pytest.mark.asyncio +async def test_trust_group_cmd_non_admin_rejects(): + """trust_group_cmd: non-admin -> reply import.admin_only.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + update.effective_user.id = 111 + context = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "is_admin_async", AsyncMock(return_value=False)): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Admin only" + await mod.trust_group_cmd(update, context) + update.message.reply_text.assert_called_once_with("Admin only") + mock_t.assert_called_with("en", "import.admin_only") + + +@pytest.mark.asyncio +async def test_trust_group_cmd_admin_adds_group(): + """trust_group_cmd: admin in group, group not yet trusted -> _sync_trust_group, reply added, then send+pin if no pin.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + update.effective_user.id = 111 + context = MagicMock() + context.bot = MagicMock() + new_msg = MagicMock() + new_msg.message_id = 50 + context.bot.send_message = AsyncMock(return_value=new_msg) + context.bot.pin_chat_message = AsyncMock() + context.application = MagicMock() + context.application.job_queue = MagicMock() + context.application.job_queue.get_jobs_by_name = MagicMock(return_value=[]) + context.application.job_queue.run_once = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "is_admin_async", AsyncMock(return_value=True)): + with patch.object(mod, "_sync_trust_group", return_value=False): + with patch.object(mod, "_sync_get_message_id", return_value=None): + with patch.object(mod, "_get_duty_message_text_sync", return_value="Duty text"): + with patch.object(mod, "_sync_save_pin") as mock_save: + with patch.object(mod, "_get_next_shift_end_sync", return_value=None): + with patch.object(mod, "_schedule_next_update", AsyncMock()): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Added" + with patch.object( + config, "DUTY_PIN_NOTIFY", False + ): + await mod.trust_group_cmd(update, context) + update.message.reply_text.assert_any_call("Added") + mock_t.assert_any_call("en", "trust_group.added") + context.bot.send_message.assert_called_once_with(chat_id=100, text="Duty text") + context.bot.pin_chat_message.assert_called_once_with( + chat_id=100, message_id=50, disable_notification=True + ) + mock_save.assert_called_once_with(100, 50) + + +@pytest.mark.asyncio +async def test_trust_group_cmd_admin_already_trusted_replies_already_trusted(): + """trust_group_cmd: admin, group already trusted -> reply already_trusted, no send/pin.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + update.effective_user.id = 111 + context = MagicMock() + context.bot = MagicMock() + context.bot.send_message = AsyncMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "is_admin_async", AsyncMock(return_value=True)): + with patch.object(mod, "_sync_trust_group", return_value=True): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Already trusted" + await mod.trust_group_cmd(update, context) + update.message.reply_text.assert_called_once_with("Already trusted") + mock_t.assert_called_with("en", "trust_group.already_trusted") + context.bot.send_message.assert_not_called() + + +@pytest.mark.asyncio +async def test_untrust_group_cmd_removes_group(): + """untrust_group_cmd: admin, trusted group with pin -> remove from trusted, delete pin, remove job, unpin/delete message, reply removed.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + update.effective_user.id = 111 + context = MagicMock() + context.bot = MagicMock() + context.bot.unpin_chat_message = AsyncMock() + context.bot.delete_message = AsyncMock() + context.application = MagicMock() + mock_job = MagicMock() + context.application.job_queue = MagicMock() + context.application.job_queue.get_jobs_by_name = MagicMock(return_value=[mock_job]) + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "is_admin_async", AsyncMock(return_value=True)): + with patch.object(mod, "_sync_untrust_group", return_value=(True, 99)): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Removed" + await mod.untrust_group_cmd(update, context) + update.message.reply_text.assert_called_once_with("Removed") + mock_t.assert_called_with("en", "untrust_group.removed") + context.application.job_queue.get_jobs_by_name.assert_called_once_with("duty_pin_100") + mock_job.schedule_removal.assert_called_once() + context.bot.unpin_chat_message.assert_called_once_with(chat_id=100) + context.bot.delete_message.assert_called_once_with(chat_id=100, message_id=99) + + +@pytest.mark.asyncio +async def test_untrust_group_cmd_not_trusted_replies_not_trusted(): + """untrust_group_cmd: group not in trusted list -> reply not_trusted.""" + update = MagicMock() + update.message = MagicMock() + update.message.reply_text = AsyncMock() + update.effective_chat = MagicMock() + update.effective_chat.type = "group" + update.effective_chat.id = 100 + update.effective_user = MagicMock() + context = MagicMock() + + with patch("duty_teller.handlers.group_duty_pin.get_lang", return_value="en"): + with patch.object(mod, "is_admin_async", AsyncMock(return_value=True)): + with patch.object(mod, "_sync_untrust_group", return_value=(False, None)): + with patch("duty_teller.handlers.group_duty_pin.t") as mock_t: + mock_t.return_value = "Not trusted" + await mod.untrust_group_cmd(update, context) + update.message.reply_text.assert_called_once_with("Not trusted") + mock_t.assert_called_with("en", "untrust_group.not_trusted") diff --git a/tests/test_trusted_groups_repository.py b/tests/test_trusted_groups_repository.py new file mode 100644 index 0000000..2303859 --- /dev/null +++ b/tests/test_trusted_groups_repository.py @@ -0,0 +1,84 @@ +"""Unit tests for trusted_groups repository functions.""" + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from duty_teller.db.models import Base +from duty_teller.db.repository import ( + is_trusted_group, + add_trusted_group, + remove_trusted_group, + get_all_trusted_group_ids, +) + + +@pytest.fixture +def session(): + """In-memory SQLite session with all tables (including trusted_groups).""" + engine = create_engine( + "sqlite:///:memory:", connect_args={"check_same_thread": False} + ) + Base.metadata.create_all(engine) + Session = sessionmaker(bind=engine, autocommit=False, autoflush=False) + s = Session() + try: + yield s + finally: + s.close() + engine.dispose() + + +def test_is_trusted_group_empty_returns_false(session): + """is_trusted_group returns False when no record exists.""" + assert is_trusted_group(session, 100) is False + assert is_trusted_group(session, 200) is False + + +def test_add_trusted_group_creates_record(session): + """add_trusted_group creates a record and returns TrustedGroup.""" + record = add_trusted_group(session, 100, added_by_user_id=12345) + assert record.chat_id == 100 + assert record.added_by_user_id == 12345 + assert record.added_at is not None + + +def test_is_trusted_group_after_add_returns_true(session): + """is_trusted_group returns True after add_trusted_group.""" + add_trusted_group(session, 100) + assert is_trusted_group(session, 100) is True + assert is_trusted_group(session, 101) is False + + +def test_add_trusted_group_without_added_by_user_id(session): + """add_trusted_group accepts added_by_user_id None.""" + record = add_trusted_group(session, 200, added_by_user_id=None) + assert record.chat_id == 200 + assert record.added_by_user_id is None + + +def test_remove_trusted_group_removes_record(session): + """remove_trusted_group removes the record.""" + add_trusted_group(session, 100) + assert is_trusted_group(session, 100) is True + remove_trusted_group(session, 100) + assert is_trusted_group(session, 100) is False + + +def test_remove_trusted_group_idempotent(session): + """remove_trusted_group on non-existent chat_id does not raise.""" + remove_trusted_group(session, 999) + + +def test_get_all_trusted_group_ids_empty(session): + """get_all_trusted_group_ids returns empty list when no trusted groups.""" + assert get_all_trusted_group_ids(session) == [] + + +def test_get_all_trusted_group_ids_returns_added_chats(session): + """get_all_trusted_group_ids returns all trusted chat_ids.""" + add_trusted_group(session, 10) + add_trusted_group(session, 20) + add_trusted_group(session, 30) + ids = get_all_trusted_group_ids(session) + assert set(ids) == {10, 20, 30}