- Added `bot_username` to settings for dynamic retrieval of the bot's username. - Implemented `_resolve_bot_username` function to fetch the bot's username if not set, improving user experience in group chats. - Updated `DutyWithUser` schema to include optional `phone` and `username` fields for enhanced duty information. - Enhanced API responses to include contact details for users, ensuring better communication. - Introduced a new current duty view in the web app, displaying active duty information along with contact options. - Updated CSS styles for better presentation of contact information in duty cards. - Added unit tests to verify the inclusion of contact details in API responses and the functionality of the current duty view.
525 lines
20 KiB
Python
525 lines
20 KiB
Python
"""Pinned duty message in groups: handle bot add/remove, schedule updates at shift end."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Literal
|
|
|
|
import duty_teller.config as config
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
|
|
from telegram.constants import ChatMemberStatus
|
|
from telegram.error import BadRequest, Forbidden
|
|
from telegram.ext import ChatMemberHandler, CommandHandler, ContextTypes
|
|
|
|
from duty_teller.db.session import session_scope
|
|
from duty_teller.i18n import get_lang, t
|
|
from duty_teller.handlers.common import is_admin_async
|
|
from duty_teller.services.group_duty_pin_service import (
|
|
get_duty_message_text,
|
|
get_message_id,
|
|
get_next_shift_end_utc,
|
|
get_pin_refresh_data,
|
|
save_pin,
|
|
delete_pin,
|
|
get_all_pin_chat_ids,
|
|
is_group_trusted,
|
|
trust_group,
|
|
untrust_group,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
JOB_NAME_PREFIX = "duty_pin_"
|
|
RETRY_WHEN_NO_DUTY_MINUTES = 15
|
|
|
|
|
|
def _sync_get_pin_refresh_data(
|
|
chat_id: int, lang: str = "en"
|
|
) -> tuple[int | None, str, datetime | None]:
|
|
"""Get message_id, duty text, next_shift_end in one DB session."""
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_pin_refresh_data(session, chat_id, config.DUTY_DISPLAY_TZ, lang)
|
|
|
|
|
|
def _get_duty_message_text_sync(lang: str = "en") -> str:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_duty_message_text(session, config.DUTY_DISPLAY_TZ, lang)
|
|
|
|
|
|
def _get_next_shift_end_sync():
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_next_shift_end_utc(session)
|
|
|
|
|
|
def _sync_save_pin(chat_id: int, message_id: int) -> None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
save_pin(session, chat_id, message_id)
|
|
|
|
|
|
def _sync_delete_pin(chat_id: int) -> None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
delete_pin(session, chat_id)
|
|
|
|
|
|
def _sync_get_message_id(chat_id: int) -> int | None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_message_id(session, chat_id)
|
|
|
|
|
|
def _sync_is_trusted(chat_id: int) -> bool:
|
|
"""Check if the group is trusted (sync wrapper for handlers)."""
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return is_group_trusted(session, chat_id)
|
|
|
|
|
|
def _sync_trust_group(chat_id: int, added_by_user_id: int | None) -> bool:
|
|
"""Add group to trusted list. Returns True if already trusted (no-op)."""
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
if is_group_trusted(session, chat_id):
|
|
return True
|
|
trust_group(session, chat_id, added_by_user_id)
|
|
return False
|
|
|
|
|
|
def _get_contact_button_markup(lang: str) -> InlineKeyboardMarkup | None:
|
|
"""Return inline keyboard with 'View contacts' URL button, or None if BOT_USERNAME not set.
|
|
|
|
Uses a t.me Mini App deep link so the app opens inside Telegram. Uses url (not web_app):
|
|
InlineKeyboardButton with web_app is allowed only in private chats, so in groups
|
|
Telegram returns Button_type_invalid. A plain URL button works everywhere.
|
|
"""
|
|
if not config.BOT_USERNAME:
|
|
return None
|
|
url = f"https://t.me/{config.BOT_USERNAME}?startapp=duty"
|
|
button = InlineKeyboardButton(
|
|
text=t(lang, "pin_duty.view_contacts"),
|
|
url=url,
|
|
)
|
|
return InlineKeyboardMarkup([[button]])
|
|
|
|
|
|
def _sync_untrust_group(chat_id: int) -> tuple[bool, int | None]:
|
|
"""Remove group from trusted list.
|
|
|
|
Returns:
|
|
(was_trusted, message_id): was_trusted False if group was not in list;
|
|
message_id of pinned message if any (for cleanup), else None.
|
|
"""
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
if not is_group_trusted(session, chat_id):
|
|
return (False, None)
|
|
message_id = get_message_id(session, chat_id)
|
|
delete_pin(session, chat_id)
|
|
untrust_group(session, chat_id)
|
|
return (True, message_id)
|
|
|
|
|
|
async def _schedule_next_update(
|
|
application, chat_id: int, when_utc: datetime | None
|
|
) -> None:
|
|
job_queue = application.job_queue
|
|
if job_queue is None:
|
|
logger.warning("Job queue not available, cannot schedule pin update")
|
|
return
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
for job in job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
if when_utc is not None:
|
|
now_utc = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
delay = when_utc - now_utc
|
|
if delay.total_seconds() < 1:
|
|
delay = 1
|
|
job_queue.run_once(
|
|
update_group_pin,
|
|
when=delay,
|
|
data={"chat_id": chat_id},
|
|
name=name,
|
|
)
|
|
logger.info("Scheduled pin update for chat_id=%s at %s", chat_id, when_utc)
|
|
else:
|
|
from datetime import timedelta
|
|
|
|
job_queue.run_once(
|
|
update_group_pin,
|
|
when=timedelta(minutes=RETRY_WHEN_NO_DUTY_MINUTES),
|
|
data={"chat_id": chat_id},
|
|
name=name,
|
|
)
|
|
logger.info(
|
|
"No next shift for chat_id=%s; scheduled retry in %s min",
|
|
chat_id,
|
|
RETRY_WHEN_NO_DUTY_MINUTES,
|
|
)
|
|
|
|
|
|
async def _refresh_pin_for_chat(
|
|
context: ContextTypes.DEFAULT_TYPE, chat_id: int
|
|
) -> Literal["updated", "no_message", "failed", "untrusted"]:
|
|
"""Refresh pinned duty message: send new message, unpin old, pin new, save new message_id, delete old.
|
|
|
|
Uses single DB session for message_id, text, next_shift_end (consolidated).
|
|
If the group is no longer trusted, removes pin record, job, and message; returns "untrusted".
|
|
|
|
Returns:
|
|
"updated" if the message was sent, pinned and saved successfully;
|
|
"no_message" if there is no pin record for this chat;
|
|
"failed" if send_message or permissions failed;
|
|
"untrusted" if the group was removed from trusted list (pin record and message cleaned up).
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
|
|
if not trusted:
|
|
old_message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
|
|
await loop.run_in_executor(None, _sync_delete_pin, chat_id)
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
if context.application.job_queue:
|
|
for job in context.application.job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
if old_message_id is not None:
|
|
try:
|
|
await context.bot.unpin_chat_message(chat_id=chat_id)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
try:
|
|
await context.bot.delete_message(
|
|
chat_id=chat_id, message_id=old_message_id
|
|
)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
logger.info("Chat_id=%s no longer trusted, removed pin record and job", chat_id)
|
|
return "untrusted"
|
|
message_id, text, next_end = await loop.run_in_executor(
|
|
None,
|
|
lambda: _sync_get_pin_refresh_data(chat_id, config.DEFAULT_LANGUAGE),
|
|
)
|
|
if message_id is None:
|
|
logger.info("No pin record for chat_id=%s, skipping update", chat_id)
|
|
return "no_message"
|
|
old_message_id = message_id
|
|
try:
|
|
msg = await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=text,
|
|
reply_markup=_get_contact_button_markup(config.DEFAULT_LANGUAGE),
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to send duty message for pin refresh chat_id=%s: %s", chat_id, e
|
|
)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "failed"
|
|
try:
|
|
await context.bot.unpin_chat_message(chat_id=chat_id)
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=not config.DUTY_PIN_NOTIFY,
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Unpin or pin after refresh failed chat_id=%s: %s", chat_id, e)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "failed"
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
if old_message_id is not None:
|
|
try:
|
|
await context.bot.delete_message(chat_id=chat_id, message_id=old_message_id)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Could not delete old pinned message %s in chat_id=%s: %s",
|
|
old_message_id,
|
|
chat_id,
|
|
e,
|
|
)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "updated"
|
|
|
|
|
|
async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Job callback: refresh pinned duty message and schedule next update at shift end."""
|
|
chat_id = context.job.data.get("chat_id")
|
|
if chat_id is None:
|
|
return
|
|
await _refresh_pin_for_chat(context, chat_id)
|
|
|
|
|
|
async def my_chat_member_handler(
|
|
update: Update, context: ContextTypes.DEFAULT_TYPE
|
|
) -> None:
|
|
"""Handle bot added to or removed from group: send/pin duty message or delete pin record."""
|
|
if not update.my_chat_member or not update.effective_user:
|
|
return
|
|
old = update.my_chat_member.old_chat_member
|
|
new = update.my_chat_member.new_chat_member
|
|
chat = update.effective_chat
|
|
if not chat or chat.type not in ("group", "supergroup"):
|
|
return
|
|
if new.user.id != context.bot.id:
|
|
return
|
|
chat_id = chat.id
|
|
|
|
if new.status in (
|
|
ChatMemberStatus.MEMBER,
|
|
ChatMemberStatus.ADMINISTRATOR,
|
|
) and old.status in (
|
|
ChatMemberStatus.LEFT,
|
|
ChatMemberStatus.BANNED,
|
|
):
|
|
loop = asyncio.get_running_loop()
|
|
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
|
|
if not trusted:
|
|
lang = get_lang(update.effective_user)
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=t(lang, "group.not_trusted"),
|
|
)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
return
|
|
lang = get_lang(update.effective_user)
|
|
text = await loop.run_in_executor(
|
|
None, lambda: _get_duty_message_text_sync(lang)
|
|
)
|
|
try:
|
|
msg = await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=text,
|
|
reply_markup=_get_contact_button_markup(lang),
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Failed to send duty message in chat_id=%s: %s", chat_id, e)
|
|
return
|
|
pinned = False
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=True,
|
|
)
|
|
pinned = True
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Failed to pin message in chat_id=%s: %s", chat_id, e)
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
if not pinned:
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=t(lang, "pin_duty.could_not_pin_make_admin"),
|
|
)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return
|
|
|
|
if new.status in (ChatMemberStatus.LEFT, ChatMemberStatus.BANNED):
|
|
await asyncio.get_running_loop().run_in_executor(
|
|
None, _sync_delete_pin, chat_id
|
|
)
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
if context.application.job_queue:
|
|
for job in context.application.job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
logger.info("Bot left chat_id=%s, removed pin record and jobs", chat_id)
|
|
|
|
|
|
def _get_all_pin_chat_ids_sync() -> list[int]:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_all_pin_chat_ids(session)
|
|
|
|
|
|
async def restore_group_pin_jobs(application) -> None:
|
|
"""Restore scheduled pin-update jobs for all chats that have a pinned message (on startup)."""
|
|
loop = asyncio.get_running_loop()
|
|
chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
for chat_id in chat_ids:
|
|
await _schedule_next_update(application, chat_id, next_end)
|
|
logger.info("Restored %s group pin jobs", len(chat_ids))
|
|
|
|
|
|
async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /pin_duty: pin the current duty message in the group (reply to bot's message)."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "pin_duty.group_only"))
|
|
return
|
|
chat_id = chat.id
|
|
loop = asyncio.get_running_loop()
|
|
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
|
|
if not trusted:
|
|
await update.message.reply_text(t(lang, "group.not_trusted"))
|
|
return
|
|
message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
|
|
if message_id is None:
|
|
text = await loop.run_in_executor(
|
|
None, lambda: _get_duty_message_text_sync(lang)
|
|
)
|
|
try:
|
|
msg = await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=text,
|
|
reply_markup=_get_contact_button_markup(lang),
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to send duty message for pin_duty chat_id=%s: %s", chat_id, e
|
|
)
|
|
await update.message.reply_text(t(lang, "pin_duty.failed"))
|
|
return
|
|
pinned = False
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=True,
|
|
)
|
|
pinned = True
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to pin message for pin_duty chat_id=%s: %s", chat_id, e
|
|
)
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
if pinned:
|
|
await update.message.reply_text(t(lang, "pin_duty.pinned"))
|
|
else:
|
|
await update.message.reply_text(
|
|
t(lang, "pin_duty.could_not_pin_make_admin")
|
|
)
|
|
return
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=message_id,
|
|
disable_notification=True,
|
|
)
|
|
await update.message.reply_text(t(lang, "pin_duty.pinned"))
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e)
|
|
await update.message.reply_text(t(lang, "pin_duty.failed"))
|
|
|
|
|
|
async def refresh_pin_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /refresh_pin: immediately refresh pinned duty message in the group."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "refresh_pin.group_only"))
|
|
return
|
|
chat_id = chat.id
|
|
loop = asyncio.get_running_loop()
|
|
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
|
|
if not trusted:
|
|
await update.message.reply_text(t(lang, "group.not_trusted"))
|
|
return
|
|
result = await _refresh_pin_for_chat(context, chat_id)
|
|
await update.message.reply_text(t(lang, f"refresh_pin.{result}"))
|
|
|
|
|
|
async def trust_group_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /trust_group: add current group to trusted list (admin only)."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "trust_group.group_only"))
|
|
return
|
|
if not await is_admin_async(update.effective_user.id):
|
|
await update.message.reply_text(t(lang, "import.admin_only"))
|
|
return
|
|
chat_id = chat.id
|
|
loop = asyncio.get_running_loop()
|
|
already_trusted = await loop.run_in_executor(
|
|
None,
|
|
lambda: _sync_trust_group(chat_id, update.effective_user.id if update.effective_user else None),
|
|
)
|
|
if already_trusted:
|
|
await update.message.reply_text(t(lang, "trust_group.already_trusted"))
|
|
return
|
|
await update.message.reply_text(t(lang, "trust_group.added"))
|
|
message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
|
|
if message_id is None:
|
|
text = await loop.run_in_executor(
|
|
None, lambda: _get_duty_message_text_sync(lang)
|
|
)
|
|
try:
|
|
msg = await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=text,
|
|
reply_markup=_get_contact_button_markup(lang),
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to send duty message after trust_group chat_id=%s: %s",
|
|
chat_id,
|
|
e,
|
|
)
|
|
return
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=not config.DUTY_PIN_NOTIFY,
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to pin message after trust_group chat_id=%s: %s", chat_id, e
|
|
)
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
|
|
|
|
async def untrust_group_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /untrust_group: remove current group from trusted list (admin only)."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "untrust_group.group_only"))
|
|
return
|
|
if not await is_admin_async(update.effective_user.id):
|
|
await update.message.reply_text(t(lang, "import.admin_only"))
|
|
return
|
|
chat_id = chat.id
|
|
loop = asyncio.get_running_loop()
|
|
was_trusted, message_id = await loop.run_in_executor(
|
|
None, _sync_untrust_group, chat_id
|
|
)
|
|
if not was_trusted:
|
|
await update.message.reply_text(t(lang, "untrust_group.not_trusted"))
|
|
return
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
if context.application.job_queue:
|
|
for job in context.application.job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
if message_id is not None:
|
|
try:
|
|
await context.bot.unpin_chat_message(chat_id=chat_id)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
try:
|
|
await context.bot.delete_message(chat_id=chat_id, message_id=message_id)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
await update.message.reply_text(t(lang, "untrust_group.removed"))
|
|
|
|
|
|
group_duty_pin_handler = ChatMemberHandler(
|
|
my_chat_member_handler,
|
|
ChatMemberHandler.MY_CHAT_MEMBER,
|
|
)
|
|
pin_duty_handler = CommandHandler("pin_duty", pin_duty_cmd)
|
|
refresh_pin_handler = CommandHandler("refresh_pin", refresh_pin_cmd)
|
|
trust_group_handler = CommandHandler("trust_group", trust_group_cmd)
|
|
untrust_group_handler = CommandHandler("untrust_group", untrust_group_cmd)
|