Files
duty-teller/handlers/import_duty_schedule.py
Nikolay Tatarinov 7a963eccd1 Add event type handling for duties in the system
- Introduced a new `event_type` column in the `duties` table to categorize duties as 'duty', 'unavailable', or 'vacation'.
- Updated the duty schedule import functionality to parse and store event types from the JSON input.
- Enhanced the API response to include event types for each duty, improving the calendar display logic.
- Modified the web application to visually differentiate between duty types in the calendar and duty list.
- Updated tests to cover new event type functionality and ensure correct parsing and storage of duties.
- Revised README documentation to reflect changes in duty event types and their representation in the system.
2026-02-17 23:01:07 +03:00

197 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Import duty-schedule: /import_duty_schedule (admin only). Two steps: handover time -> JSON file."""
import asyncio
import re
from datetime import date, datetime, timedelta, timezone
import config
from telegram import Update
from telegram.ext import CommandHandler, ContextTypes, MessageHandler, filters
from db.session import get_session
from db.repository import (
get_or_create_user_by_full_name,
delete_duties_in_range,
insert_duty,
)
from importers.duty_schedule import (
DutyScheduleParseError,
DutyScheduleResult,
parse_duty_schedule,
)
# HH:MM or HH:MM:SS, optional space + timezone (IANA or "UTC")
HANDOVER_TIME_RE = re.compile(
r"^\s*(\d{1,2}):(\d{2})(?::(\d{2}))?\s*(?:\s+(\S+))?\s*$", re.IGNORECASE
)
def _parse_handover_time(text: str) -> tuple[int, int] | None:
"""Parse handover time string to (hour_utc, minute_utc). Returns None on failure."""
m = HANDOVER_TIME_RE.match(text)
if not m:
return None
hour = int(m.group(1))
minute = int(m.group(2))
# second = m.group(3) ignored
tz_str = (m.group(4) or "").strip()
if not tz_str or tz_str.upper() == "UTC":
return (hour % 24, minute)
try:
from zoneinfo import ZoneInfo
except ImportError:
try:
from backports.zoneinfo import ZoneInfo # type: ignore
except ImportError:
return None
try:
tz = ZoneInfo(tz_str)
except Exception:
return None
# Build datetime in that tz and convert to UTC
dt = datetime(2000, 1, 1, hour, minute, 0, tzinfo=tz)
utc = dt.astimezone(timezone.utc)
return (utc.hour, utc.minute)
def _duty_to_iso(d: date, hour_utc: int, minute_utc: int) -> str:
"""ISO 8601 with Z for start of duty on date d at given UTC time."""
dt = datetime(d.year, d.month, d.day, hour_utc, minute_utc, 0, tzinfo=timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
async def import_duty_schedule_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.effective_user:
return
if not config.is_admin(update.effective_user.username or ""):
await update.message.reply_text("Доступ только для администраторов.")
return
context.user_data["awaiting_handover_time"] = True
await update.message.reply_text(
"Укажите время пересменки в формате ЧЧ:ММ и часовой пояс, "
"например 09:00 Europe/Moscow или 06:00 UTC."
)
async def handle_handover_time_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.effective_user or not update.message.text:
return
if not context.user_data.get("awaiting_handover_time"):
return
if not config.is_admin(update.effective_user.username or ""):
return
text = update.message.text.strip()
parsed = _parse_handover_time(text)
if parsed is None:
await update.message.reply_text(
"Не удалось разобрать время. Укажите, например: 09:00 Europe/Moscow"
)
return
hour_utc, minute_utc = parsed
context.user_data["handover_utc_time"] = (hour_utc, minute_utc)
context.user_data["awaiting_handover_time"] = False
context.user_data["awaiting_duty_schedule_file"] = True
await update.message.reply_text(
"Отправьте файл в формате duty-schedule (JSON)."
)
def _run_import(
database_url: str,
result: DutyScheduleResult,
hour_utc: int,
minute_utc: int,
) -> tuple[int, int, int, int]:
"""Returns (num_users, num_duty, num_unavailable, num_vacation)."""
session = get_session(database_url)
try:
from_date_str = result.start_date.isoformat()
to_date_str = result.end_date.isoformat()
num_duty = num_unavailable = num_vacation = 0
for entry in result.entries:
user = get_or_create_user_by_full_name(session, entry.full_name)
delete_duties_in_range(session, user.id, from_date_str, to_date_str)
for d in entry.duty_dates:
start_at = _duty_to_iso(d, hour_utc, minute_utc)
d_next = d + timedelta(days=1)
end_at = _duty_to_iso(d_next, hour_utc, minute_utc)
insert_duty(session, user.id, start_at, end_at, event_type="duty")
num_duty += 1
for d in entry.unavailable_dates:
start_at = _duty_to_iso(d, hour_utc, minute_utc)
d_next = d + timedelta(days=1)
end_at = _duty_to_iso(d_next, hour_utc, minute_utc)
insert_duty(
session, user.id, start_at, end_at, event_type="unavailable"
)
num_unavailable += 1
for d in entry.vacation_dates:
start_at = _duty_to_iso(d, hour_utc, minute_utc)
d_next = d + timedelta(days=1)
end_at = _duty_to_iso(d_next, hour_utc, minute_utc)
insert_duty(session, user.id, start_at, end_at, event_type="vacation")
num_vacation += 1
return (len(result.entries), num_duty, num_unavailable, num_vacation)
finally:
session.close()
async def handle_duty_schedule_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.document or not update.effective_user:
return
if not context.user_data.get("awaiting_duty_schedule_file"):
return
handover = context.user_data.get("handover_utc_time")
if not handover or not config.is_admin(update.effective_user.username or ""):
return
if not (update.message.document.file_name or "").lower().endswith(".json"):
await update.message.reply_text("Нужен файл с расширением .json")
return
hour_utc, minute_utc = handover
file_id = update.message.document.file_id
# Download and parse in async context
file = await context.bot.get_file(file_id)
raw = bytes(await file.download_as_bytearray())
try:
result = parse_duty_schedule(raw)
except DutyScheduleParseError as e:
context.user_data.pop("awaiting_duty_schedule_file", None)
context.user_data.pop("handover_utc_time", None)
await update.message.reply_text(f"Ошибка разбора файла: {e}")
return
loop = asyncio.get_running_loop()
try:
num_users, num_duty, num_unavailable, num_vacation = await loop.run_in_executor(
None,
lambda: _run_import(config.DATABASE_URL, result, hour_utc, minute_utc),
)
except Exception as e:
await update.message.reply_text(f"Ошибка импорта: {e}")
else:
total = num_duty + num_unavailable + num_vacation
parts = [f"{num_users} пользователей", f"{num_duty} дежурств"]
if num_unavailable:
parts.append(f"{num_unavailable} недоступностей")
if num_vacation:
parts.append(f"{num_vacation} отпусков")
await update.message.reply_text(
"Импорт выполнен: " + ", ".join(parts) + f" (всего {total} событий)."
)
finally:
context.user_data.pop("awaiting_duty_schedule_file", None)
context.user_data.pop("handover_utc_time", None)
import_duty_schedule_handler = CommandHandler("import_duty_schedule", import_duty_schedule_cmd)
handover_time_handler = MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_handover_time_text,
)
duty_schedule_document_handler = MessageHandler(
filters.Document.FileExtension("json"),
handle_duty_schedule_document,
)