All checks were successful
CI / lint-and-test (push) Successful in 23s
- Introduced a new configuration option `DUTY_PIN_NOTIFY` to control whether the bot re-pins the duty message when updated, providing notifications to group members. - Updated the architecture documentation to reflect the new functionality of re-pinning duty messages. - Enhanced the `.env.example` file to include the new configuration option with a description. - Added tests to verify the behavior of the new refresh pin command and its integration with the existing group duty pin functionality. - Updated internationalization messages to include help text for the new `/refresh_pin` command.
245 lines
9.2 KiB
Python
245 lines
9.2 KiB
Python
"""Command handlers: /start, /help; /start registers user."""
|
|
|
|
import asyncio
|
|
|
|
import duty_teller.config as config
|
|
from telegram import Update
|
|
from telegram.ext import CommandHandler, ContextTypes
|
|
|
|
from duty_teller.db.models import User
|
|
from duty_teller.db.session import session_scope
|
|
from duty_teller.db.repository import (
|
|
get_or_create_user,
|
|
get_user_by_telegram_id,
|
|
get_user_by_username,
|
|
set_user_phone,
|
|
create_calendar_token,
|
|
can_access_miniapp_for_telegram_user,
|
|
set_user_role,
|
|
ROLE_USER,
|
|
ROLE_ADMIN,
|
|
)
|
|
from duty_teller.handlers.common import is_admin_async
|
|
from duty_teller.i18n import get_lang, t
|
|
from duty_teller.utils.user import build_full_name
|
|
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /start: register user in DB and send greeting."""
|
|
if not update.message:
|
|
return
|
|
user = update.effective_user
|
|
if not user:
|
|
return
|
|
full_name = build_full_name(user.first_name, user.last_name)
|
|
telegram_user_id = user.id
|
|
username = user.username
|
|
first_name = user.first_name
|
|
last_name = user.last_name
|
|
|
|
def do_get_or_create() -> None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
get_or_create_user(
|
|
session,
|
|
telegram_user_id=telegram_user_id,
|
|
full_name=full_name,
|
|
username=username,
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
)
|
|
|
|
await asyncio.get_running_loop().run_in_executor(None, do_get_or_create)
|
|
|
|
lang = get_lang(user)
|
|
text = t(lang, "start.greeting")
|
|
await update.message.reply_text(text)
|
|
|
|
|
|
async def set_phone(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /set_phone [number]: set or clear phone (private chat only)."""
|
|
if not update.message or not update.effective_user:
|
|
return
|
|
lang = get_lang(update.effective_user)
|
|
if update.effective_chat and update.effective_chat.type != "private":
|
|
await update.message.reply_text(t(lang, "set_phone.private_only"))
|
|
return
|
|
args = context.args or []
|
|
phone = " ".join(args).strip() if args else None
|
|
telegram_user_id = update.effective_user.id
|
|
|
|
def do_set_phone() -> str | None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
full_name = build_full_name(
|
|
update.effective_user.first_name, update.effective_user.last_name
|
|
)
|
|
get_or_create_user(
|
|
session,
|
|
telegram_user_id=telegram_user_id,
|
|
full_name=full_name,
|
|
username=update.effective_user.username,
|
|
first_name=update.effective_user.first_name,
|
|
last_name=update.effective_user.last_name,
|
|
)
|
|
user = set_user_phone(session, telegram_user_id, phone or None)
|
|
if user is None:
|
|
return "error"
|
|
if phone:
|
|
return "saved"
|
|
return "cleared"
|
|
|
|
result = await asyncio.get_running_loop().run_in_executor(None, do_set_phone)
|
|
if result == "error":
|
|
await update.message.reply_text(t(lang, "set_phone.error"))
|
|
elif result == "saved":
|
|
await update.message.reply_text(t(lang, "set_phone.saved", phone=phone or ""))
|
|
else:
|
|
await update.message.reply_text(t(lang, "set_phone.cleared"))
|
|
|
|
|
|
async def calendar_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /calendar_link: send personal ICS URL (private chat only; user must be in allowlist)."""
|
|
if not update.message or not update.effective_user:
|
|
return
|
|
lang = get_lang(update.effective_user)
|
|
if update.effective_chat and update.effective_chat.type != "private":
|
|
await update.message.reply_text(t(lang, "calendar_link.private_only"))
|
|
return
|
|
telegram_user_id = update.effective_user.id
|
|
full_name = build_full_name(
|
|
update.effective_user.first_name, update.effective_user.last_name
|
|
)
|
|
|
|
def do_calendar_link() -> tuple[str | None, str | None]:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
user = get_or_create_user(
|
|
session,
|
|
telegram_user_id=telegram_user_id,
|
|
full_name=full_name,
|
|
username=update.effective_user.username,
|
|
first_name=update.effective_user.first_name,
|
|
last_name=update.effective_user.last_name,
|
|
)
|
|
if not can_access_miniapp_for_telegram_user(session, telegram_user_id):
|
|
return (None, "denied")
|
|
token = create_calendar_token(session, user.id)
|
|
return (token, None)
|
|
|
|
result_token, error = await asyncio.get_running_loop().run_in_executor(
|
|
None, do_calendar_link
|
|
)
|
|
if error == "denied":
|
|
await update.message.reply_text(t(lang, "calendar_link.access_denied"))
|
|
return
|
|
if not result_token:
|
|
await update.message.reply_text(t(lang, "calendar_link.error"))
|
|
return
|
|
base = (config.MINI_APP_BASE_URL or "").rstrip("/")
|
|
if not base:
|
|
await update.message.reply_text(t(lang, "calendar_link.error"))
|
|
return
|
|
url_personal = f"{base}/api/calendar/ical/{result_token}.ics"
|
|
url_team = f"{base}/api/calendar/ical/team/{result_token}.ics"
|
|
await update.message.reply_text(
|
|
t(
|
|
lang,
|
|
"calendar_link.success",
|
|
url_personal=url_personal,
|
|
url_team=url_team,
|
|
)
|
|
+ "\n\n"
|
|
+ t(lang, "calendar_link.help_hint")
|
|
)
|
|
|
|
|
|
async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /help: send list of commands (admins see import_duty_schedule)."""
|
|
if not update.message or not update.effective_user:
|
|
return
|
|
lang = get_lang(update.effective_user)
|
|
lines = [
|
|
t(lang, "help.title"),
|
|
t(lang, "help.start"),
|
|
t(lang, "help.help"),
|
|
t(lang, "help.set_phone"),
|
|
t(lang, "help.calendar_link"),
|
|
t(lang, "help.pin_duty"),
|
|
t(lang, "help.refresh_pin"),
|
|
]
|
|
if await is_admin_async(update.effective_user.id):
|
|
lines.append(t(lang, "help.import_schedule"))
|
|
lines.append(t(lang, "help.set_role"))
|
|
await update.message.reply_text("\n".join(lines))
|
|
|
|
|
|
async def set_role(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /set_role: set user role (admin only). Usage: /set_role @username user|admin or reply + user|admin."""
|
|
if not update.message or not update.effective_user:
|
|
return
|
|
lang = get_lang(update.effective_user)
|
|
if not await is_admin_async(update.effective_user.id):
|
|
await update.message.reply_text(t(lang, "import.admin_only"))
|
|
return
|
|
args = (context.args or [])[:2]
|
|
# Resolve target: reply -> telegram_user_id; or first arg @username / numeric telegram_id
|
|
target_user = None
|
|
role_name = None
|
|
if update.message.reply_to_message and update.message.reply_to_message.from_user:
|
|
target_telegram_id = update.message.reply_to_message.from_user.id
|
|
role_name = (args[0] or "").strip().lower() if args else None
|
|
|
|
def do_get_reply() -> User | None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_user_by_telegram_id(session, target_telegram_id)
|
|
|
|
target_user = await asyncio.get_running_loop().run_in_executor(
|
|
None, do_get_reply
|
|
)
|
|
elif len(args) >= 2:
|
|
first = (args[0] or "").strip()
|
|
role_name = (args[1] or "").strip().lower()
|
|
if first.lstrip("@").isdigit():
|
|
target_telegram_id = int(first.lstrip("@"))
|
|
|
|
def do_get_by_tid() -> User | None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_user_by_telegram_id(session, target_telegram_id)
|
|
|
|
target_user = await asyncio.get_running_loop().run_in_executor(
|
|
None, do_get_by_tid
|
|
)
|
|
else:
|
|
|
|
def do_get_by_username() -> User | None:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
return get_user_by_username(session, first)
|
|
|
|
target_user = await asyncio.get_running_loop().run_in_executor(
|
|
None, do_get_by_username
|
|
)
|
|
if not role_name or role_name not in (ROLE_USER, ROLE_ADMIN):
|
|
await update.message.reply_text(t(lang, "set_role.usage"))
|
|
return
|
|
if not target_user:
|
|
await update.message.reply_text(t(lang, "set_role.user_not_found"))
|
|
return
|
|
|
|
def do_set_role() -> bool:
|
|
with session_scope(config.DATABASE_URL) as session:
|
|
updated = set_user_role(session, target_user.id, role_name)
|
|
return updated is not None
|
|
|
|
ok = await asyncio.get_running_loop().run_in_executor(None, do_set_role)
|
|
if ok:
|
|
await update.message.reply_text(
|
|
t(lang, "set_role.done", name=target_user.full_name, role=role_name)
|
|
)
|
|
else:
|
|
await update.message.reply_text(t(lang, "set_role.error"))
|
|
|
|
|
|
start_handler = CommandHandler("start", start)
|
|
help_handler = CommandHandler("help", help_cmd)
|
|
set_phone_handler = CommandHandler("set_phone", set_phone)
|
|
calendar_link_handler = CommandHandler("calendar_link", calendar_link)
|
|
set_role_handler = CommandHandler("set_role", set_role)
|