All checks were successful
CI / lint-and-test (push) Successful in 24s
- Changed the behavior of the group duty pin feature to send a new message, unpin the old one, and pin the new one instead of editing the existing message. This ensures the pinned message is always fresh. - Updated the `DUTY_PIN_NOTIFY` configuration description in the documentation to reflect the new message handling approach. - Revised the architecture documentation to clarify the updated group duty pin process. - Enhanced tests to verify the new behavior of the group duty pin functionality, ensuring proper message handling and scheduling.
275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""Pinned duty message in groups: handle bot add/remove, schedule updates at shift end."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Literal
|
|
|
|
import duty_teller.config as config
|
|
from telegram import Update
|
|
from telegram.constants import ChatMemberStatus
|
|
from telegram.error import BadRequest, Forbidden
|
|
from telegram.ext import ChatMemberHandler, CommandHandler, ContextTypes
|
|
|
|
from duty_teller.db.session import session_scope
|
|
from duty_teller.i18n import get_lang, t
|
|
from duty_teller.services.group_duty_pin_service import (
|
|
get_duty_message_text,
|
|
get_next_shift_end_utc,
|
|
save_pin,
|
|
delete_pin,
|
|
get_message_id,
|
|
get_all_pin_chat_ids,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
JOB_NAME_PREFIX = "duty_pin_"
|
|
RETRY_WHEN_NO_DUTY_MINUTES = 15
|
|
|
|
|
|
def _get_duty_message_text_sync(lang: str = "en") -> str:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_duty_message_text(session, config.DUTY_DISPLAY_TZ, lang)
|
|
|
|
|
|
def _get_next_shift_end_sync():
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_next_shift_end_utc(session)
|
|
|
|
|
|
def _sync_save_pin(chat_id: int, message_id: int) -> None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
save_pin(session, chat_id, message_id)
|
|
|
|
|
|
def _sync_delete_pin(chat_id: int) -> None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
delete_pin(session, chat_id)
|
|
|
|
|
|
def _sync_get_message_id(chat_id: int) -> int | None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_message_id(session, chat_id)
|
|
|
|
|
|
async def _schedule_next_update(
|
|
application, chat_id: int, when_utc: datetime | None
|
|
) -> None:
|
|
job_queue = application.job_queue
|
|
if job_queue is None:
|
|
logger.warning("Job queue not available, cannot schedule pin update")
|
|
return
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
for job in job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
if when_utc is not None:
|
|
now_utc = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
delay = when_utc - now_utc
|
|
if delay.total_seconds() < 1:
|
|
delay = 1
|
|
job_queue.run_once(
|
|
update_group_pin,
|
|
when=delay,
|
|
data={"chat_id": chat_id},
|
|
name=name,
|
|
)
|
|
logger.info("Scheduled pin update for chat_id=%s at %s", chat_id, when_utc)
|
|
else:
|
|
from datetime import timedelta
|
|
|
|
job_queue.run_once(
|
|
update_group_pin,
|
|
when=timedelta(minutes=RETRY_WHEN_NO_DUTY_MINUTES),
|
|
data={"chat_id": chat_id},
|
|
name=name,
|
|
)
|
|
logger.info(
|
|
"No next shift for chat_id=%s; scheduled retry in %s min",
|
|
chat_id,
|
|
RETRY_WHEN_NO_DUTY_MINUTES,
|
|
)
|
|
|
|
|
|
async def _refresh_pin_for_chat(
|
|
context: ContextTypes.DEFAULT_TYPE, chat_id: int
|
|
) -> Literal["updated", "no_message", "failed"]:
|
|
"""Refresh pinned duty message: send new message, unpin old, pin new, save new message_id.
|
|
|
|
Returns:
|
|
"updated" if the message was sent, pinned and saved successfully;
|
|
"no_message" if there is no pin record for this chat;
|
|
"failed" if send_message or permissions failed.
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
|
|
if message_id is None:
|
|
logger.info("No pin record for chat_id=%s, skipping update", chat_id)
|
|
return "no_message"
|
|
text = await loop.run_in_executor(
|
|
None, lambda: _get_duty_message_text_sync(config.DEFAULT_LANGUAGE)
|
|
)
|
|
try:
|
|
msg = await context.bot.send_message(chat_id=chat_id, text=text)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning(
|
|
"Failed to send duty message for pin refresh chat_id=%s: %s", chat_id, e
|
|
)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "failed"
|
|
try:
|
|
await context.bot.unpin_chat_message(chat_id=chat_id)
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=not config.DUTY_PIN_NOTIFY,
|
|
)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Unpin or pin after refresh failed chat_id=%s: %s", chat_id, e)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "failed"
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return "updated"
|
|
|
|
|
|
async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Job callback: refresh pinned duty message and schedule next update at shift end."""
|
|
chat_id = context.job.data.get("chat_id")
|
|
if chat_id is None:
|
|
return
|
|
await _refresh_pin_for_chat(context, chat_id)
|
|
|
|
|
|
async def my_chat_member_handler(
|
|
update: Update, context: ContextTypes.DEFAULT_TYPE
|
|
) -> None:
|
|
"""Handle bot added to or removed from group: send/pin duty message or delete pin record."""
|
|
if not update.my_chat_member or not update.effective_user:
|
|
return
|
|
old = update.my_chat_member.old_chat_member
|
|
new = update.my_chat_member.new_chat_member
|
|
chat = update.effective_chat
|
|
if not chat or chat.type not in ("group", "supergroup"):
|
|
return
|
|
if new.user.id != context.bot.id:
|
|
return
|
|
chat_id = chat.id
|
|
|
|
if new.status in (
|
|
ChatMemberStatus.MEMBER,
|
|
ChatMemberStatus.ADMINISTRATOR,
|
|
) and old.status in (
|
|
ChatMemberStatus.LEFT,
|
|
ChatMemberStatus.BANNED,
|
|
):
|
|
loop = asyncio.get_running_loop()
|
|
lang = get_lang(update.effective_user)
|
|
text = await loop.run_in_executor(
|
|
None, lambda: _get_duty_message_text_sync(lang)
|
|
)
|
|
try:
|
|
msg = await context.bot.send_message(chat_id=chat_id, text=text)
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Failed to send duty message in chat_id=%s: %s", chat_id, e)
|
|
return
|
|
pinned = False
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
disable_notification=True,
|
|
)
|
|
pinned = True
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("Failed to pin message in chat_id=%s: %s", chat_id, e)
|
|
await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
|
|
if not pinned:
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=t(lang, "pin_duty.could_not_pin_make_admin"),
|
|
)
|
|
except (BadRequest, Forbidden):
|
|
pass
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
await _schedule_next_update(context.application, chat_id, next_end)
|
|
return
|
|
|
|
if new.status in (ChatMemberStatus.LEFT, ChatMemberStatus.BANNED):
|
|
await asyncio.get_running_loop().run_in_executor(
|
|
None, _sync_delete_pin, chat_id
|
|
)
|
|
name = f"{JOB_NAME_PREFIX}{chat_id}"
|
|
if context.application.job_queue:
|
|
for job in context.application.job_queue.get_jobs_by_name(name):
|
|
job.schedule_removal()
|
|
logger.info("Bot left chat_id=%s, removed pin record and jobs", chat_id)
|
|
|
|
|
|
def _get_all_pin_chat_ids_sync() -> list[int]:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_all_pin_chat_ids(session)
|
|
|
|
|
|
async def restore_group_pin_jobs(application) -> None:
|
|
"""Restore scheduled pin-update jobs for all chats that have a pinned message (on startup)."""
|
|
loop = asyncio.get_running_loop()
|
|
chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync)
|
|
next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
|
|
for chat_id in chat_ids:
|
|
await _schedule_next_update(application, chat_id, next_end)
|
|
logger.info("Restored %s group pin jobs", len(chat_ids))
|
|
|
|
|
|
async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /pin_duty: pin the current duty message in the group (reply to bot's message)."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "pin_duty.group_only"))
|
|
return
|
|
chat_id = chat.id
|
|
loop = asyncio.get_running_loop()
|
|
message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
|
|
if message_id is None:
|
|
await update.message.reply_text(t(lang, "pin_duty.no_message"))
|
|
return
|
|
try:
|
|
await context.bot.pin_chat_message(
|
|
chat_id=chat_id,
|
|
message_id=message_id,
|
|
disable_notification=True,
|
|
)
|
|
await update.message.reply_text(t(lang, "pin_duty.pinned"))
|
|
except (BadRequest, Forbidden) as e:
|
|
logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e)
|
|
await update.message.reply_text(t(lang, "pin_duty.failed"))
|
|
|
|
|
|
async def refresh_pin_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /refresh_pin: immediately refresh pinned duty message in the group."""
|
|
if not update.message or not update.effective_chat or not update.effective_user:
|
|
return
|
|
chat = update.effective_chat
|
|
lang = get_lang(update.effective_user)
|
|
if chat.type not in ("group", "supergroup"):
|
|
await update.message.reply_text(t(lang, "refresh_pin.group_only"))
|
|
return
|
|
chat_id = chat.id
|
|
result = await _refresh_pin_for_chat(context, chat_id)
|
|
await update.message.reply_text(t(lang, f"refresh_pin.{result}"))
|
|
|
|
|
|
group_duty_pin_handler = ChatMemberHandler(
|
|
my_chat_member_handler,
|
|
ChatMemberHandler.MY_CHAT_MEMBER,
|
|
)
|
|
pin_duty_handler = CommandHandler("pin_duty", pin_duty_cmd)
|
|
refresh_pin_handler = CommandHandler("refresh_pin", refresh_pin_cmd)
|