Files
duty-teller/handlers/commands.py
Nikolay Tatarinov ef5dbca5df Implement duty schedule import functionality and enhance user management
- Added a new command `/import_duty_schedule` for importing duty schedules via JSON, restricted to admin users.
- Introduced a two-step import process: specifying handover time and uploading a JSON file.
- Updated the database schema to allow `telegram_user_id` to be nullable for user creation by full name.
- Implemented repository functions for user management, including `get_or_create_user_by_full_name` and `delete_duties_in_range`.
- Enhanced README documentation with details on the new import command and JSON format requirements.
- Added comprehensive tests for the duty schedule parser and integration tests for the import functionality.
2026-02-17 21:45:23 +03:00

63 lines
2.0 KiB
Python

"""Command handlers: /start, /help; /start registers user."""
import asyncio
import config
from telegram import Update
from telegram.ext import CommandHandler, ContextTypes
from db.session import get_session
from db.repository import get_or_create_user
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message:
return
user = update.effective_user
if not user:
return
full_name = (
" ".join(filter(None, [user.first_name or "", user.last_name or ""])).strip()
or "User"
)
telegram_user_id = user.id
username = user.username
first_name = user.first_name
last_name = user.last_name
def do_get_or_create() -> None:
session = get_session(config.DATABASE_URL)
try:
get_or_create_user(
session,
telegram_user_id=telegram_user_id,
full_name=full_name,
username=username,
first_name=first_name,
last_name=last_name,
)
finally:
session.close()
await asyncio.get_running_loop().run_in_executor(None, do_get_or_create)
text = "Привет! Я бот календаря дежурств. Используй /help для списка команд."
await update.message.reply_text(text)
async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.effective_user:
return
lines = [
"Доступные команды:",
"/start — Начать",
"/help — Показать эту справку",
]
if config.is_admin(update.effective_user.username or ""):
lines.append("/import_duty_schedule — Импорт расписания дежурств (JSON)")
await update.message.reply_text("\n".join(lines))
start_handler = CommandHandler("start", start)
help_handler = CommandHandler("help", help_cmd)