- Updated the `pin_duty_cmd` to handle cases where no message ID is found by sending a new duty message, pinning it, saving the pin, and scheduling the next update. - Improved error handling for message sending and pinning operations, providing appropriate replies based on success or failure. - Enhanced unit tests to cover the new behavior, ensuring proper functionality and error handling in various scenarios.
310 lines
12 KiB
Python
310 lines
12 KiB
Python
"""Pinned duty message in groups: handle bot add/remove, schedule updates at shift end."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Literal
|
|
|
|
import duty_teller.config as config
|
|
from telegram import Update
|
|
from telegram.constants import ChatMemberStatus
|
|
from telegram.error import BadRequest, Forbidden
|
|
from telegram.ext import ChatMemberHandler, CommandHandler, ContextTypes
|
|
|
|
from duty_teller.db.session import session_scope
|
|
from duty_teller.i18n import get_lang, t
|
|
from duty_teller.services.group_duty_pin_service import (
|
|
get_duty_message_text,
|
|
get_message_id,
|
|
get_next_shift_end_utc,
|
|
get_pin_refresh_data,
|
|
save_pin,
|
|
delete_pin,
|
|
get_all_pin_chat_ids,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
JOB_NAME_PREFIX = "duty_pin_"
|
|
RETRY_WHEN_NO_DUTY_MINUTES = 15
|
|
|
|
|
|
def _sync_get_pin_refresh_data(
|
|
chat_id: int, lang: str = "en"
|
|
) -> tuple[int | None, str, datetime | None]:
|
|
"""Get message_id, duty text, next_shift_end in one DB session."""
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_pin_refresh_data(session, chat_id, config.DUTY_DISPLAY_TZ, lang)
|
|
|
|
|
|
def _get_duty_message_text_sync(lang: str = "en") -> str:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_duty_message_text(session, config.DUTY_DISPLAY_TZ, lang)
|
|
|
|
|
|
def _get_next_shift_end_sync():
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_next_shift_end_utc(session)
|
|
|
|
|
|
def _sync_save_pin(chat_id: int, message_id: int) -> None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
save_pin(session, chat_id, message_id)
|
|
|
|
|
|
def _sync_delete_pin(chat_id: int) -> None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
delete_pin(session, chat_id)
|
|
|
|
|
|
def _sync_get_message_id(chat_id: int) -> int | None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_message_id(session, chat_id)
|
|
|
|
|
|
async def _schedule_next_update(
|
|
application, chat_id: int, when_utc: datetime | None
|
|
) -> None:
|
|
job_queue = application.job_queue
|
|
if job_queue is None:
|
|
logger.warning("Job queue not available, cannot schedule pin update")
|
|
return
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
for job in job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
if when_utc is not None:
|
|
now_utc = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
delay = when_utc - now_utc
|
|
if delay.total_seconds() < 1:
|
|
delay = 1
|
|
job_queue.run_once(
|
|
update_group_pin,
|
|
when=delay,
|
|
data={"chat_id": chat_id},
|
|
name=name,
|
|
)
|
|
logger.info("Scheduled pin update for chat_id=%s at %s", chat_id, when_utc)
|
|
else:
|
|
from datetime import timedelta
|
|
|
|
job_queue.run_once(
|
|
update_group_pin,
|
|
when=timedelta(minutes=RETRY_WHEN_NO_DUTY_MINUTES),
|
|
data={"chat_id": chat_id},
|
|
name=name,
|
|
)
|
|
logger.info(
|
|
"No next shift for chat_id=%s; scheduled retry in %s min",
|
|
chat_id,
|
|
RETRY_WHEN_NO_DUTY_MINUTES,
|
|
)
|
|
|
|
|
|
async def _refresh_pin_for_chat(
|
|
context: ContextTypes.DEFAULT_TYPE, chat_id: int
|
|
) -> Literal["updated", "no_message", "failed"]:
|
|
"""Refresh pinned duty message: send new message, unpin old, pin new, save new message_id.
|
|
|
|
Uses single DB session for message_id, text, next_shift_end (consolidated).
|
|
|
|
Returns:
|
|
"updated" if the message was sent, pinned and saved successfully;
|
|
"no_message" if there is no pin record for this chat;
|
|
"failed" if send_message or permissions failed.
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
message_id, text, next_end = await loop.run_in_executor(
|
|
None,
|
|
lambda: _sync_get_pin_refresh_data(chat_id, config.DEFAULT_LANGUAGE),
|
|
)
|
|
if message_id is None:
|
|
logger.info("No pin record for chat_id=%s, skipping update", chat_id)
|
|
return "no_message"
|
|
try:
|
|
msg = await context.bot.send_message(chat_id=chat_id, text=text)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to send duty message for pin refresh chat_id=%s: %s", chat_id, e
|
|
)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "failed"
|
|
try:
|
|
await context.bot.unpin_chat_message(chat_id=chat_id)
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=not config.DUTY_PIN_NOTIFY,
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Unpin or pin after refresh failed chat_id=%s: %s", chat_id, e)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "failed"
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "updated"
|
|
|
|
|
|
async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Job callback: refresh pinned duty message and schedule next update at shift end."""
|
|
chat_id = context.job.data.get("chat_id")
|
|
if chat_id is None:
|
|
return
|
|
await _refresh_pin_for_chat(context, chat_id)
|
|
|
|
|
|
async def my_chat_member_handler(
|
|
update: Update, context: ContextTypes.DEFAULT_TYPE
|
|
) -> None:
|
|
"""Handle bot added to or removed from group: send/pin duty message or delete pin record."""
|
|
if not update.my_chat_member or not update.effective_user:
|
|
return
|
|
old = update.my_chat_member.old_chat_member
|
|
new = update.my_chat_member.new_chat_member
|
|
chat = update.effective_chat
|
|
if not chat or chat.type not in ("group", "supergroup"):
|
|
return
|
|
if new.user.id != context.bot.id:
|
|
return
|
|
chat_id = chat.id
|
|
|
|
if new.status in (
|
|
ChatMemberStatus.MEMBER,
|
|
ChatMemberStatus.ADMINISTRATOR,
|
|
) and old.status in (
|
|
ChatMemberStatus.LEFT,
|
|
ChatMemberStatus.BANNED,
|
|
):
|
|
loop = asyncio.get_running_loop()
|
|
lang = get_lang(update.effective_user)
|
|
text = await loop.run_in_executor(
|
|
None, lambda: _get_duty_message_text_sync(lang)
|
|
)
|
|
try:
|
|
msg = await context.bot.send_message(chat_id=chat_id, text=text)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Failed to send duty message in chat_id=%s: %s", chat_id, e)
|
|
return
|
|
pinned = False
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=True,
|
|
)
|
|
pinned = True
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Failed to pin message in chat_id=%s: %s", chat_id, e)
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
if not pinned:
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=t(lang, "pin_duty.could_not_pin_make_admin"),
|
|
)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return
|
|
|
|
if new.status in (ChatMemberStatus.LEFT, ChatMemberStatus.BANNED):
|
|
await asyncio.get_running_loop().run_in_executor(
|
|
None, _sync_delete_pin, chat_id
|
|
)
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
if context.application.job_queue:
|
|
for job in context.application.job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
logger.info("Bot left chat_id=%s, removed pin record and jobs", chat_id)
|
|
|
|
|
|
def _get_all_pin_chat_ids_sync() -> list[int]:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_all_pin_chat_ids(session)
|
|
|
|
|
|
async def restore_group_pin_jobs(application) -> None:
|
|
"""Restore scheduled pin-update jobs for all chats that have a pinned message (on startup)."""
|
|
loop = asyncio.get_running_loop()
|
|
chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
for chat_id in chat_ids:
|
|
await _schedule_next_update(application, chat_id, next_end)
|
|
logger.info("Restored %s group pin jobs", len(chat_ids))
|
|
|
|
|
|
async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /pin_duty: pin the current duty message in the group (reply to bot's message)."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "pin_duty.group_only"))
|
|
return
|
|
chat_id = chat.id
|
|
loop = asyncio.get_running_loop()
|
|
message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
|
|
if message_id is None:
|
|
text = await loop.run_in_executor(
|
|
None, lambda: _get_duty_message_text_sync(lang)
|
|
)
|
|
try:
|
|
msg = await context.bot.send_message(chat_id=chat_id, text=text)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to send duty message for pin_duty chat_id=%s: %s", chat_id, e
|
|
)
|
|
await update.message.reply_text(t(lang, "pin_duty.failed"))
|
|
return
|
|
pinned = False
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=True,
|
|
)
|
|
pinned = True
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Failed to pin message for pin_duty chat_id=%s: %s", chat_id, e)
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
if pinned:
|
|
await update.message.reply_text(t(lang, "pin_duty.pinned"))
|
|
else:
|
|
await update.message.reply_text(t(lang, "pin_duty.could_not_pin_make_admin"))
|
|
return
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=message_id,
|
|
disable_notification=True,
|
|
)
|
|
await update.message.reply_text(t(lang, "pin_duty.pinned"))
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e)
|
|
await update.message.reply_text(t(lang, "pin_duty.failed"))
|
|
|
|
|
|
async def refresh_pin_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /refresh_pin: immediately refresh pinned duty message in the group."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "refresh_pin.group_only"))
|
|
return
|
|
chat_id = chat.id
|
|
result = await _refresh_pin_for_chat(context, chat_id)
|
|
await update.message.reply_text(t(lang, f"refresh_pin.{result}"))
|
|
|
|
|
|
group_duty_pin_handler = ChatMemberHandler(
|
|
my_chat_member_handler,
|
|
ChatMemberHandler.MY_CHAT_MEMBER,
|
|
)
|
|
pin_duty_handler = CommandHandler("pin_duty", pin_duty_cmd)
|
|
refresh_pin_handler = CommandHandler("refresh_pin", refresh_pin_cmd)
|