Files
duty-teller/duty_teller/handlers/group_duty_pin.py
Nikolay Tatarinov 87e8417675 refactor: improve code readability and structure in various components
- Refactored the `mini_app_short_name` assignment in `config.py` for better clarity.
- Enhanced the `app_config_js` function in `app.py` to improve formatting of the JavaScript response body.
- Added per-chat locks in `group_duty_pin.py` to prevent concurrent refreshes, improving message handling.
- Updated `_schedule_next_update` to include optional jitter for scheduling, enhancing performance during high-load scenarios.
- Cleaned up test files by removing unused imports and improving test descriptions for clarity.
2026-03-03 17:52:23 +03:00

569 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Pinned duty message in groups: handle bot add/remove, schedule updates at shift end."""
import asyncio
import logging
import random
from datetime import datetime, timedelta, timezone
from typing import Literal
import duty_teller.config as config
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.constants import ChatMemberStatus
from telegram.error import BadRequest, Forbidden
from telegram.ext import ChatMemberHandler, CommandHandler, ContextTypes
from duty_teller.db.session import session_scope
from duty_teller.i18n import get_lang, t
from duty_teller.handlers.common import is_admin_async
from duty_teller.services.group_duty_pin_service import (
get_duty_message_text,
get_message_id,
get_next_shift_end_utc,
get_pin_refresh_data,
save_pin,
delete_pin,
get_all_pin_chat_ids,
is_group_trusted,
trust_group,
untrust_group,
)
logger = logging.getLogger(__name__)
# Per-chat locks to prevent concurrent refresh for the same chat (avoids duplicate messages).
_refresh_locks: dict[int, asyncio.Lock] = {}
_lock_for_refresh_locks = asyncio.Lock()
JOB_NAME_PREFIX = "duty_pin_"
RETRY_WHEN_NO_DUTY_MINUTES = 15
def _sync_get_pin_refresh_data(
chat_id: int, lang: str = "en"
) -> tuple[int | None, str, datetime | None]:
"""Get message_id, duty text, next_shift_end in one DB session."""
with session_scope(config.DATABASE_URL) as session:
return get_pin_refresh_data(session, chat_id, config.DUTY_DISPLAY_TZ, lang)
def _get_duty_message_text_sync(lang: str = "en") -> str:
with session_scope(config.DATABASE_URL) as session:
return get_duty_message_text(session, config.DUTY_DISPLAY_TZ, lang)
def _get_next_shift_end_sync():
with session_scope(config.DATABASE_URL) as session:
return get_next_shift_end_utc(session)
def _sync_save_pin(chat_id: int, message_id: int) -> None:
with session_scope(config.DATABASE_URL) as session:
save_pin(session, chat_id, message_id)
def _sync_delete_pin(chat_id: int) -> None:
with session_scope(config.DATABASE_URL) as session:
delete_pin(session, chat_id)
def _sync_get_message_id(chat_id: int) -> int | None:
with session_scope(config.DATABASE_URL) as session:
return get_message_id(session, chat_id)
def _sync_is_trusted(chat_id: int) -> bool:
"""Check if the group is trusted (sync wrapper for handlers)."""
with session_scope(config.DATABASE_URL) as session:
return is_group_trusted(session, chat_id)
def _sync_trust_group(chat_id: int, added_by_user_id: int | None) -> bool:
"""Add group to trusted list. Returns True if already trusted (no-op)."""
with session_scope(config.DATABASE_URL) as session:
if is_group_trusted(session, chat_id):
return True
trust_group(session, chat_id, added_by_user_id)
return False
def _get_contact_button_markup(lang: str) -> InlineKeyboardMarkup | None:
"""Return inline keyboard with 'View contacts' URL button, or None if BOT_USERNAME not set.
Uses a t.me Mini App deep link so the app opens inside Telegram. Uses url (not web_app):
InlineKeyboardButton with web_app is allowed only in private chats, so in groups
Telegram returns Button_type_invalid. A plain URL button works everywhere.
When MINI_APP_SHORT_NAME is set, the URL is a direct Mini App link so the app opens
with start_param=duty (current duty view). Otherwise the link is to the bot with
?startapp=duty (user may land in bot chat; opening the app from menu does not pass
start_param in some clients).
"""
if not config.BOT_USERNAME:
return None
short = (config.MINI_APP_SHORT_NAME or "").strip().strip("/")
if short:
url = f"https://t.me/{config.BOT_USERNAME}/{short}?startapp=duty"
else:
url = f"https://t.me/{config.BOT_USERNAME}?startapp=duty"
button = InlineKeyboardButton(
text=t(lang, "pin_duty.view_contacts"),
url=url,
)
return InlineKeyboardMarkup([[button]])
def _sync_untrust_group(chat_id: int) -> tuple[bool, int | None]:
"""Remove group from trusted list.
Returns:
(was_trusted, message_id): was_trusted False if group was not in list;
message_id of pinned message if any (for cleanup), else None.
"""
with session_scope(config.DATABASE_URL) as session:
if not is_group_trusted(session, chat_id):
return (False, None)
message_id = get_message_id(session, chat_id)
delete_pin(session, chat_id)
untrust_group(session, chat_id)
return (True, message_id)
async def _schedule_next_update(
application,
chat_id: int,
when_utc: datetime | None,
jitter_seconds: float | None = None,
) -> None:
"""Schedule the next pin refresh job. Optional jitter spreads jobs when scheduling many chats."""
job_queue = application.job_queue
if job_queue is None:
logger.warning("Job queue not available, cannot schedule pin update")
return
name = f"{JOB_NAME_PREFIX}{chat_id}"
for job in job_queue.get_jobs_by_name(name):
job.schedule_removal()
if when_utc is not None:
now_utc = datetime.now(timezone.utc).replace(tzinfo=None)
delay = when_utc - now_utc
if jitter_seconds is not None and jitter_seconds > 0:
delay += timedelta(seconds=random.uniform(0, jitter_seconds))
if delay.total_seconds() < 1:
delay = timedelta(seconds=1)
job_queue.run_once(
update_group_pin,
when=delay,
data={"chat_id": chat_id},
name=name,
)
logger.info("Scheduled pin update for chat_id=%s at %s", chat_id, when_utc)
else:
job_queue.run_once(
update_group_pin,
when=timedelta(minutes=RETRY_WHEN_NO_DUTY_MINUTES),
data={"chat_id": chat_id},
name=name,
)
logger.info(
"No next shift for chat_id=%s; scheduled retry in %s min",
chat_id,
RETRY_WHEN_NO_DUTY_MINUTES,
)
async def _refresh_pin_for_chat(
context: ContextTypes.DEFAULT_TYPE, chat_id: int
) -> Literal["updated", "no_message", "failed", "untrusted"]:
"""Refresh pinned duty message: send new message, unpin old, pin new, save new message_id, delete old.
Uses single DB session for message_id, text, next_shift_end (consolidated).
If the group is no longer trusted, removes pin record, job, and message; returns "untrusted".
Unpin is best-effort (e.g. if user already unpinned we still pin the new message and save state).
Per-chat lock prevents concurrent refresh for the same chat.
Returns:
"updated" if the message was sent, pinned and saved successfully;
"no_message" if there is no pin record for this chat;
"failed" if send_message or pin failed;
"untrusted" if the group was removed from trusted list (pin record and message cleaned up).
"""
loop = asyncio.get_running_loop()
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
if not trusted:
old_message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
await loop.run_in_executor(None, _sync_delete_pin, chat_id)
name = f"{JOB_NAME_PREFIX}{chat_id}"
if context.application.job_queue:
for job in context.application.job_queue.get_jobs_by_name(name):
job.schedule_removal()
if old_message_id is not None:
try:
await context.bot.unpin_chat_message(chat_id=chat_id)
except (BadRequest, Forbidden):
pass
try:
await context.bot.delete_message(
chat_id=chat_id, message_id=old_message_id
)
except (BadRequest, Forbidden):
pass
logger.info("Chat_id=%s no longer trusted, removed pin record and job", chat_id)
return "untrusted"
message_id, text, next_end = await loop.run_in_executor(
None,
lambda: _sync_get_pin_refresh_data(chat_id, config.DEFAULT_LANGUAGE),
)
if message_id is None:
logger.info("No pin record for chat_id=%s, skipping update", chat_id)
return "no_message"
old_message_id = message_id
async with _lock_for_refresh_locks:
lock = _refresh_locks.setdefault(chat_id, asyncio.Lock())
try:
async with lock:
try:
msg = await context.bot.send_message(
chat_id=chat_id,
text=text,
reply_markup=_get_contact_button_markup(config.DEFAULT_LANGUAGE),
)
except (BadRequest, Forbidden) as e:
logger.warning(
"Failed to send duty message for pin refresh chat_id=%s: %s",
chat_id,
e,
)
await _schedule_next_update(context.application, chat_id, next_end)
return "failed"
try:
await context.bot.unpin_chat_message(chat_id=chat_id)
except (BadRequest, Forbidden) as e:
logger.debug(
"Unpin failed (e.g. no pinned message) chat_id=%s: %s", chat_id, e
)
try:
await context.bot.pin_chat_message(
chat_id=chat_id,
message_id=msg.message_id,
disable_notification=not config.DUTY_PIN_NOTIFY,
)
except (BadRequest, Forbidden) as e:
logger.warning("Pin after refresh failed chat_id=%s: %s", chat_id, e)
await _schedule_next_update(context.application, chat_id, next_end)
return "failed"
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
if old_message_id is not None:
try:
await context.bot.delete_message(
chat_id=chat_id, message_id=old_message_id
)
except (BadRequest, Forbidden) as e:
logger.warning(
"Could not delete old pinned message %s in chat_id=%s: %s",
old_message_id,
chat_id,
e,
)
await _schedule_next_update(context.application, chat_id, next_end)
return "updated"
finally:
async with _lock_for_refresh_locks:
_refresh_locks.pop(chat_id, None)
async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None:
"""Job callback: refresh pinned duty message and schedule next update at shift end."""
chat_id = context.job.data.get("chat_id")
if chat_id is None:
return
await _refresh_pin_for_chat(context, chat_id)
async def my_chat_member_handler(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle bot added to or removed from group: send/pin duty message or delete pin record."""
if not update.my_chat_member or not update.effective_user:
return
old = update.my_chat_member.old_chat_member
new = update.my_chat_member.new_chat_member
chat = update.effective_chat
if not chat or chat.type not in ("group", "supergroup"):
return
if new.user.id != context.bot.id:
return
chat_id = chat.id
if new.status in (
ChatMemberStatus.MEMBER,
ChatMemberStatus.ADMINISTRATOR,
) and old.status in (
ChatMemberStatus.LEFT,
ChatMemberStatus.BANNED,
):
loop = asyncio.get_running_loop()
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
if not trusted:
lang = get_lang(update.effective_user)
try:
await context.bot.send_message(
chat_id=chat_id,
text=t(lang, "group.not_trusted"),
)
except (BadRequest, Forbidden):
pass
return
lang = get_lang(update.effective_user)
text = await loop.run_in_executor(
None, lambda: _get_duty_message_text_sync(lang)
)
try:
msg = await context.bot.send_message(
chat_id=chat_id,
text=text,
reply_markup=_get_contact_button_markup(lang),
)
except (BadRequest, Forbidden) as e:
logger.warning("Failed to send duty message in chat_id=%s: %s", chat_id, e)
return
pinned = False
try:
await context.bot.pin_chat_message(
chat_id=chat_id,
message_id=msg.message_id,
disable_notification=True,
)
pinned = True
except (BadRequest, Forbidden) as e:
logger.warning("Failed to pin message in chat_id=%s: %s", chat_id, e)
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
if not pinned:
try:
await context.bot.send_message(
chat_id=chat_id,
text=t(lang, "pin_duty.could_not_pin_make_admin"),
)
except (BadRequest, Forbidden):
pass
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
await _schedule_next_update(context.application, chat_id, next_end)
return
if new.status in (ChatMemberStatus.LEFT, ChatMemberStatus.BANNED):
await asyncio.get_running_loop().run_in_executor(
None, _sync_delete_pin, chat_id
)
name = f"{JOB_NAME_PREFIX}{chat_id}"
if context.application.job_queue:
for job in context.application.job_queue.get_jobs_by_name(name):
job.schedule_removal()
logger.info("Bot left chat_id=%s, removed pin record and jobs", chat_id)
def _get_all_pin_chat_ids_sync() -> list[int]:
with session_scope(config.DATABASE_URL) as session:
return get_all_pin_chat_ids(session)
async def restore_group_pin_jobs(application) -> None:
"""Restore scheduled pin-update jobs for all chats that have a pinned message (on startup).
Uses jitter (060 s) per chat to avoid thundering herd when many groups share the same shift end.
"""
loop = asyncio.get_running_loop()
chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync)
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
for chat_id in chat_ids:
await _schedule_next_update(application, chat_id, next_end, jitter_seconds=60.0)
logger.info("Restored %s group pin jobs", len(chat_ids))
async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /pin_duty: pin the current duty message in the group (reply to bot's message)."""
if not update.message or not update.effective_chat or not update.effective_user:
return
chat = update.effective_chat
lang = get_lang(update.effective_user)
if chat.type not in ("group", "supergroup"):
await update.message.reply_text(t(lang, "pin_duty.group_only"))
return
chat_id = chat.id
loop = asyncio.get_running_loop()
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
if not trusted:
await update.message.reply_text(t(lang, "group.not_trusted"))
return
message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
if message_id is None:
text = await loop.run_in_executor(
None, lambda: _get_duty_message_text_sync(lang)
)
try:
msg = await context.bot.send_message(
chat_id=chat_id,
text=text,
reply_markup=_get_contact_button_markup(lang),
)
except (BadRequest, Forbidden) as e:
logger.warning(
"Failed to send duty message for pin_duty chat_id=%s: %s", chat_id, e
)
await update.message.reply_text(t(lang, "pin_duty.failed"))
return
pinned = False
try:
await context.bot.pin_chat_message(
chat_id=chat_id,
message_id=msg.message_id,
disable_notification=True,
)
pinned = True
except (BadRequest, Forbidden) as e:
logger.warning(
"Failed to pin message for pin_duty chat_id=%s: %s", chat_id, e
)
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
await _schedule_next_update(context.application, chat_id, next_end)
if pinned:
await update.message.reply_text(t(lang, "pin_duty.pinned"))
else:
await update.message.reply_text(
t(lang, "pin_duty.could_not_pin_make_admin")
)
return
try:
await context.bot.pin_chat_message(
chat_id=chat_id,
message_id=message_id,
disable_notification=True,
)
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
await _schedule_next_update(context.application, chat_id, next_end)
await update.message.reply_text(t(lang, "pin_duty.pinned"))
except (BadRequest, Forbidden) as e:
logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e)
await update.message.reply_text(t(lang, "pin_duty.failed"))
async def refresh_pin_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /refresh_pin: immediately refresh pinned duty message in the group."""
if not update.message or not update.effective_chat or not update.effective_user:
return
chat = update.effective_chat
lang = get_lang(update.effective_user)
if chat.type not in ("group", "supergroup"):
await update.message.reply_text(t(lang, "refresh_pin.group_only"))
return
chat_id = chat.id
loop = asyncio.get_running_loop()
trusted = await loop.run_in_executor(None, _sync_is_trusted, chat_id)
if not trusted:
await update.message.reply_text(t(lang, "group.not_trusted"))
return
result = await _refresh_pin_for_chat(context, chat_id)
await update.message.reply_text(t(lang, f"refresh_pin.{result}"))
async def trust_group_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /trust_group: add current group to trusted list (admin only)."""
if not update.message or not update.effective_chat or not update.effective_user:
return
chat = update.effective_chat
lang = get_lang(update.effective_user)
if chat.type not in ("group", "supergroup"):
await update.message.reply_text(t(lang, "trust_group.group_only"))
return
if not await is_admin_async(update.effective_user.id):
await update.message.reply_text(t(lang, "import.admin_only"))
return
chat_id = chat.id
loop = asyncio.get_running_loop()
already_trusted = await loop.run_in_executor(
None,
lambda: _sync_trust_group(
chat_id, update.effective_user.id if update.effective_user else None
),
)
if already_trusted:
await update.message.reply_text(t(lang, "trust_group.already_trusted"))
return
await update.message.reply_text(t(lang, "trust_group.added"))
message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
if message_id is None:
text = await loop.run_in_executor(
None, lambda: _get_duty_message_text_sync(lang)
)
try:
msg = await context.bot.send_message(
chat_id=chat_id,
text=text,
reply_markup=_get_contact_button_markup(lang),
)
except (BadRequest, Forbidden) as e:
logger.warning(
"Failed to send duty message after trust_group chat_id=%s: %s",
chat_id,
e,
)
return
try:
await context.bot.pin_chat_message(
chat_id=chat_id,
message_id=msg.message_id,
disable_notification=not config.DUTY_PIN_NOTIFY,
)
except (BadRequest, Forbidden) as e:
logger.warning(
"Failed to pin message after trust_group chat_id=%s: %s", chat_id, e
)
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
await _schedule_next_update(context.application, chat_id, next_end)
async def untrust_group_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /untrust_group: remove current group from trusted list (admin only)."""
if not update.message or not update.effective_chat or not update.effective_user:
return
chat = update.effective_chat
lang = get_lang(update.effective_user)
if chat.type not in ("group", "supergroup"):
await update.message.reply_text(t(lang, "untrust_group.group_only"))
return
if not await is_admin_async(update.effective_user.id):
await update.message.reply_text(t(lang, "import.admin_only"))
return
chat_id = chat.id
loop = asyncio.get_running_loop()
was_trusted, message_id = await loop.run_in_executor(
None, _sync_untrust_group, chat_id
)
if not was_trusted:
await update.message.reply_text(t(lang, "untrust_group.not_trusted"))
return
name = f"{JOB_NAME_PREFIX}{chat_id}"
if context.application.job_queue:
for job in context.application.job_queue.get_jobs_by_name(name):
job.schedule_removal()
if message_id is not None:
try:
await context.bot.unpin_chat_message(chat_id=chat_id)
except (BadRequest, Forbidden):
pass
try:
await context.bot.delete_message(chat_id=chat_id, message_id=message_id)
except (BadRequest, Forbidden):
pass
await update.message.reply_text(t(lang, "untrust_group.removed"))
group_duty_pin_handler = ChatMemberHandler(
my_chat_member_handler,
ChatMemberHandler.MY_CHAT_MEMBER,
)
pin_duty_handler = CommandHandler("pin_duty", pin_duty_cmd)
refresh_pin_handler = CommandHandler("refresh_pin", refresh_pin_cmd)
trust_group_handler = CommandHandler("trust_group", trust_group_cmd)
untrust_group_handler = CommandHandler("untrust_group", untrust_group_cmd)