Files
duty-teller/handlers/import_duty_schedule.py
Nikolay Tatarinov ef5dbca5df Implement duty schedule import functionality and enhance user management
- Added a new command `/import_duty_schedule` for importing duty schedules via JSON, restricted to admin users.
- Introduced a two-step import process: specifying handover time and uploading a JSON file.
- Updated the database schema to allow `telegram_user_id` to be nullable for user creation by full name.
- Implemented repository functions for user management, including `get_or_create_user_by_full_name` and `delete_duties_in_range`.
- Enhanced README documentation with details on the new import command and JSON format requirements.
- Added comprehensive tests for the duty schedule parser and integration tests for the import functionality.
2026-02-17 21:45:23 +03:00

176 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Import duty-schedule: /import_duty_schedule (admin only). Two steps: handover time -> JSON file."""
import asyncio
import re
from datetime import date, datetime, timedelta, timezone
import config
from telegram import Update
from telegram.ext import CommandHandler, ContextTypes, MessageHandler, filters
from db.session import get_session
from db.repository import (
get_or_create_user_by_full_name,
delete_duties_in_range,
insert_duty,
)
from importers.duty_schedule import (
DutyScheduleParseError,
DutyScheduleResult,
parse_duty_schedule,
)
# HH:MM or HH:MM:SS, optional space + timezone (IANA or "UTC")
HANDOVER_TIME_RE = re.compile(
r"^\s*(\d{1,2}):(\d{2})(?::(\d{2}))?\s*(?:\s+(\S+))?\s*$", re.IGNORECASE
)
def _parse_handover_time(text: str) -> tuple[int, int] | None:
"""Parse handover time string to (hour_utc, minute_utc). Returns None on failure."""
m = HANDOVER_TIME_RE.match(text)
if not m:
return None
hour = int(m.group(1))
minute = int(m.group(2))
# second = m.group(3) ignored
tz_str = (m.group(4) or "").strip()
if not tz_str or tz_str.upper() == "UTC":
return (hour % 24, minute)
try:
from zoneinfo import ZoneInfo
except ImportError:
try:
from backports.zoneinfo import ZoneInfo # type: ignore
except ImportError:
return None
try:
tz = ZoneInfo(tz_str)
except Exception:
return None
# Build datetime in that tz and convert to UTC
dt = datetime(2000, 1, 1, hour, minute, 0, tzinfo=tz)
utc = dt.astimezone(timezone.utc)
return (utc.hour, utc.minute)
def _duty_to_iso(d: date, hour_utc: int, minute_utc: int) -> str:
"""ISO 8601 with Z for start of duty on date d at given UTC time."""
dt = datetime(d.year, d.month, d.day, hour_utc, minute_utc, 0, tzinfo=timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
async def import_duty_schedule_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.effective_user:
return
if not config.is_admin(update.effective_user.username or ""):
await update.message.reply_text("Доступ только для администраторов.")
return
context.user_data["awaiting_handover_time"] = True
await update.message.reply_text(
"Укажите время пересменки в формате ЧЧ:ММ и часовой пояс, "
"например 09:00 Europe/Moscow или 06:00 UTC."
)
async def handle_handover_time_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.effective_user or not update.message.text:
return
if not context.user_data.get("awaiting_handover_time"):
return
if not config.is_admin(update.effective_user.username or ""):
return
text = update.message.text.strip()
parsed = _parse_handover_time(text)
if parsed is None:
await update.message.reply_text(
"Не удалось разобрать время. Укажите, например: 09:00 Europe/Moscow"
)
return
hour_utc, minute_utc = parsed
context.user_data["handover_utc_time"] = (hour_utc, minute_utc)
context.user_data["awaiting_handover_time"] = False
context.user_data["awaiting_duty_schedule_file"] = True
await update.message.reply_text(
"Отправьте файл в формате duty-schedule (JSON)."
)
def _run_import(
database_url: str,
result: DutyScheduleResult,
hour_utc: int,
minute_utc: int,
) -> tuple[int, int]:
session = get_session(database_url)
try:
from_date_str = result.start_date.isoformat()
to_date_str = result.end_date.isoformat()
total_duties = 0
for full_name, duty_dates in result.entries:
user = get_or_create_user_by_full_name(session, full_name)
delete_duties_in_range(session, user.id, from_date_str, to_date_str)
for d in duty_dates:
start_at = _duty_to_iso(d, hour_utc, minute_utc)
d_next = d + timedelta(days=1)
end_at = _duty_to_iso(d_next, hour_utc, minute_utc)
insert_duty(session, user.id, start_at, end_at)
total_duties += 1
return (len(result.entries), total_duties)
finally:
session.close()
async def handle_duty_schedule_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.document or not update.effective_user:
return
if not context.user_data.get("awaiting_duty_schedule_file"):
return
handover = context.user_data.get("handover_utc_time")
if not handover or not config.is_admin(update.effective_user.username or ""):
return
if not (update.message.document.file_name or "").lower().endswith(".json"):
await update.message.reply_text("Нужен файл с расширением .json")
return
hour_utc, minute_utc = handover
file_id = update.message.document.file_id
# Download and parse in async context
file = await context.bot.get_file(file_id)
raw = bytes(await file.download_as_bytearray())
try:
result = parse_duty_schedule(raw)
except DutyScheduleParseError as e:
context.user_data.pop("awaiting_duty_schedule_file", None)
context.user_data.pop("handover_utc_time", None)
await update.message.reply_text(f"Ошибка разбора файла: {e}")
return
loop = asyncio.get_running_loop()
try:
num_users, num_duties = await loop.run_in_executor(
None,
lambda: _run_import(config.DATABASE_URL, result, hour_utc, minute_utc),
)
except Exception as e:
await update.message.reply_text(f"Ошибка импорта: {e}")
else:
await update.message.reply_text(
f"Импорт выполнен: {num_users} пользователей, {num_duties} дежурств."
)
finally:
context.user_data.pop("awaiting_duty_schedule_file", None)
context.user_data.pop("handover_utc_time", None)
import_duty_schedule_handler = CommandHandler("import_duty_schedule", import_duty_schedule_cmd)
handover_time_handler = MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_handover_time_text,
)
duty_schedule_document_handler = MessageHandler(
filters.Document.FileExtension("json"),
handle_duty_schedule_document,
)